Merge "Migrate octavia resources"
This commit is contained in:
commit
091466b40e
@ -37,8 +37,15 @@ class ApiReplayCli(object):
|
||||
dest_os_auth_url=args.dest_os_auth_url,
|
||||
dest_plugin=args.dest_plugin,
|
||||
use_old_keystone=args.use_old_keystone,
|
||||
max_retry=args.max_retry,
|
||||
logfile=args.logfile)
|
||||
octavia_os_tenant_name=args.octavia_os_project_name,
|
||||
octavia_os_tenant_domain_id=args.octavia_os_project_domain_id,
|
||||
octavia_os_username=args.octavia_os_username,
|
||||
octavia_os_user_domain_id=args.octavia_os_user_domain_id,
|
||||
octavia_os_password=args.octavia_os_password,
|
||||
octavia_os_auth_url=args.octavia_os_auth_url,
|
||||
neutron_conf=args.neutron_conf,
|
||||
logfile=args.logfile,
|
||||
max_retry=args.max_retry)
|
||||
|
||||
def _setup_argparse(self):
|
||||
parser = argparse.ArgumentParser()
|
||||
@ -115,11 +122,42 @@ class ApiReplayCli(object):
|
||||
action='store_true',
|
||||
help="Use old keystone client for source authentication.")
|
||||
|
||||
# Arguments required to connect to the octavia client (read only)
|
||||
parser.add_argument(
|
||||
"--octavia-os-username",
|
||||
help="The octavia os-username to use to "
|
||||
"gather loadbalancers resources with.")
|
||||
parser.add_argument(
|
||||
"--octavia-os-user-domain-id",
|
||||
default=DEFAULT_DOMAIN_ID,
|
||||
help="The octavia os-user-domain-id to use to "
|
||||
"gather loadbalancers resources with.")
|
||||
parser.add_argument(
|
||||
"--octavia-os-project-name",
|
||||
help="The octavia os-project-name to use to "
|
||||
"gather loadbalancers resource with.")
|
||||
parser.add_argument(
|
||||
"--octavia-os-project-domain-id",
|
||||
default=DEFAULT_DOMAIN_ID,
|
||||
help="The octavia os-project-domain-id to use to "
|
||||
"gather loadbalancers resource with.")
|
||||
parser.add_argument(
|
||||
"--octavia-os-password",
|
||||
help="The password for this octavia user.")
|
||||
parser.add_argument(
|
||||
"--octavia-os-auth-url",
|
||||
help="They keystone api endpoint for this octavia user.")
|
||||
|
||||
parser.add_argument(
|
||||
"--logfile",
|
||||
default=DEFAULT_LOGFILE,
|
||||
help="Output logfile.")
|
||||
|
||||
parser.add_argument(
|
||||
"--neutron_conf",
|
||||
default='/etc/neutron/neutron.conf',
|
||||
help="neutron config file path.")
|
||||
|
||||
parser.add_argument(
|
||||
"--max-retry",
|
||||
default=10,
|
||||
|
@ -10,8 +10,9 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import copy
|
||||
import logging
|
||||
import time
|
||||
import socket
|
||||
|
||||
import six
|
||||
|
||||
@ -20,13 +21,20 @@ from keystoneauth1 import session
|
||||
from neutronclient.common import exceptions as n_exc
|
||||
from neutronclient.v2_0 import client
|
||||
from octaviaclient.api.v2 import octavia
|
||||
from oslo_config import cfg
|
||||
import oslo_messaging as messaging
|
||||
from oslo_messaging.rpc import dispatcher
|
||||
from oslo_utils import excutils
|
||||
|
||||
from neutron.common import config as neutron_config
|
||||
from octavia_lib.api.drivers import driver_lib
|
||||
|
||||
from vmware_nsx.api_replay import utils
|
||||
from vmware_nsx.common import nsxv_constants
|
||||
from vmware_nsx.services.lbaas.octavia import constants as d_const
|
||||
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
LOG = logging.getLogger(__name__)
|
||||
LOG.setLevel(logging.INFO)
|
||||
|
||||
# For internal testing only
|
||||
use_old_keystone_on_dest = False
|
||||
@ -41,7 +49,15 @@ class ApiReplayClient(utils.PrepareObjectForMigration):
|
||||
dest_os_username, dest_os_user_domain_id,
|
||||
dest_os_tenant_name, dest_os_tenant_domain_id,
|
||||
dest_os_password, dest_os_auth_url, dest_plugin,
|
||||
use_old_keystone, logfile, max_retry):
|
||||
use_old_keystone,
|
||||
octavia_os_username, octavia_os_user_domain_id,
|
||||
octavia_os_tenant_name, octavia_os_tenant_domain_id,
|
||||
octavia_os_password, octavia_os_auth_url,
|
||||
neutron_conf, logfile, max_retry):
|
||||
|
||||
# Init config and logging
|
||||
if neutron_conf:
|
||||
neutron_config.init(args=['--config-file', neutron_conf])
|
||||
|
||||
if logfile:
|
||||
f_handler = logging.FileHandler(logfile)
|
||||
@ -54,6 +70,7 @@ class ApiReplayClient(utils.PrepareObjectForMigration):
|
||||
|
||||
# connect to both clients
|
||||
if use_old_keystone:
|
||||
LOG.info("Using old keystone for source neutron")
|
||||
# Since we are not sure what keystone version will be used on the
|
||||
# source setup, we add an option to use the v2 client
|
||||
self.source_neutron = client.Client(
|
||||
@ -61,7 +78,6 @@ class ApiReplayClient(utils.PrepareObjectForMigration):
|
||||
tenant_name=source_os_tenant_name,
|
||||
password=source_os_password,
|
||||
auth_url=source_os_auth_url)
|
||||
self.source_octavia = None
|
||||
else:
|
||||
self.source_neutron = self.connect_to_client(
|
||||
username=source_os_username,
|
||||
@ -70,21 +86,14 @@ class ApiReplayClient(utils.PrepareObjectForMigration):
|
||||
tenant_domain_id=source_os_tenant_domain_id,
|
||||
password=source_os_password,
|
||||
auth_url=source_os_auth_url)
|
||||
self.source_octavia = self.connect_to_octavia(
|
||||
username=source_os_username,
|
||||
user_domain_id=source_os_user_domain_id,
|
||||
tenant_name=source_os_tenant_name,
|
||||
tenant_domain_id=source_os_tenant_domain_id,
|
||||
password=source_os_password,
|
||||
auth_url=source_os_auth_url)
|
||||
|
||||
if use_old_keystone_on_dest:
|
||||
LOG.info("Using old keystone for destination neutron")
|
||||
self.dest_neutron = client.Client(
|
||||
username=dest_os_username,
|
||||
tenant_name=dest_os_tenant_name,
|
||||
password=dest_os_password,
|
||||
auth_url=dest_os_auth_url)
|
||||
self.dest_octavia = None
|
||||
else:
|
||||
self.dest_neutron = self.connect_to_client(
|
||||
username=dest_os_username,
|
||||
@ -93,13 +102,17 @@ class ApiReplayClient(utils.PrepareObjectForMigration):
|
||||
tenant_domain_id=dest_os_tenant_domain_id,
|
||||
password=dest_os_password,
|
||||
auth_url=dest_os_auth_url)
|
||||
self.dest_octavia = self.connect_to_octavia(
|
||||
username=dest_os_username,
|
||||
user_domain_id=dest_os_user_domain_id,
|
||||
tenant_name=dest_os_tenant_name,
|
||||
tenant_domain_id=dest_os_tenant_domain_id,
|
||||
password=dest_os_password,
|
||||
auth_url=dest_os_auth_url)
|
||||
|
||||
if octavia_os_auth_url:
|
||||
self.octavia = self.connect_to_octavia(
|
||||
username=octavia_os_username,
|
||||
user_domain_id=octavia_os_user_domain_id,
|
||||
tenant_name=octavia_os_tenant_name,
|
||||
tenant_domain_id=octavia_os_tenant_domain_id,
|
||||
password=octavia_os_password,
|
||||
auth_url=octavia_os_auth_url)
|
||||
else:
|
||||
self.octavia = None
|
||||
|
||||
self.dest_plugin = dest_plugin
|
||||
|
||||
@ -112,7 +125,7 @@ class ApiReplayClient(utils.PrepareObjectForMigration):
|
||||
self.migrate_floatingips()
|
||||
self.migrate_routers_routes(routers_routes)
|
||||
self.migrate_fwaas()
|
||||
if self.source_octavia and self.dest_octavia:
|
||||
if self.octavia:
|
||||
self.migrate_octavia()
|
||||
LOG.info("NSX migration is Done.")
|
||||
|
||||
@ -430,6 +443,7 @@ class ApiReplayClient(utils.PrepareObjectForMigration):
|
||||
source_networks = self.source_neutron.list_networks()['networks']
|
||||
dest_networks = self.dest_neutron.list_networks()['networks']
|
||||
dest_ports = self.dest_neutron.list_ports()['ports']
|
||||
dest_subnets = self.dest_neutron.list_subnets()['subnets']
|
||||
|
||||
remove_qos = False
|
||||
if not self.dest_qos_support:
|
||||
@ -485,6 +499,12 @@ class ApiReplayClient(utils.PrepareObjectForMigration):
|
||||
dhcp_subnets = []
|
||||
count_dhcp_subnet = 0
|
||||
for subnet_id in network['subnets']:
|
||||
|
||||
# only create subnet if the dest server doesn't have it
|
||||
if self.have_id(subnet_id, dest_subnets):
|
||||
LOG.info("Skip network %s: Already exists on the "
|
||||
"destination", network['id'])
|
||||
continue
|
||||
subnet = self.find_subnet_by_id(subnet_id, source_subnets)
|
||||
body = self.prepare_subnet(subnet)
|
||||
|
||||
@ -528,10 +548,6 @@ class ApiReplayClient(utils.PrepareObjectForMigration):
|
||||
except n_exc.BadRequest as e:
|
||||
LOG.error("Failed to create subnet: %(subnet)s: %(e)s",
|
||||
{'subnet': subnet, 'e': e})
|
||||
# NOTE(arosen): this occurs here if you run the script
|
||||
# multiple times as we don't currently
|
||||
# preserve the subnet_id. Also, 409 would be a better
|
||||
# response code for this in neutron :(
|
||||
|
||||
# create the ports on the network
|
||||
ports = self.get_ports_on_network(network['id'], source_ports)
|
||||
@ -741,93 +757,75 @@ class ApiReplayClient(utils.PrepareObjectForMigration):
|
||||
|
||||
LOG.info("FWaaS V2 migration done")
|
||||
|
||||
def _wait_for_lb_up(self, lb_id):
|
||||
retry_num = 0
|
||||
while retry_num < self.max_retry:
|
||||
lb = self.dest_octavia.load_balancer_show(lb_id)
|
||||
if not lb['provisioning_status'].startswith('PENDING'):
|
||||
if lb['provisioning_status'] == 'ACTIVE':
|
||||
return True
|
||||
# No point in trying any more
|
||||
return False
|
||||
retry_num = retry_num + 1
|
||||
time.sleep(1)
|
||||
return False
|
||||
def _delete_octavia_lb(self, body):
|
||||
kw = {'loadbalancer': body}
|
||||
self.octavia_rpc_client.call({}, 'loadbalancer_delete_cascade', **kw)
|
||||
|
||||
def _migrate_octavia_lb(self, lb, orig_map):
|
||||
# Creating all loadbalancers resources on the new nsx driver
|
||||
# using RPC calls to the plugin listener.
|
||||
|
||||
# Create the loadbalancer:
|
||||
body = self.prepare_lb_loadbalancer(lb)
|
||||
try:
|
||||
new_lb = self.dest_octavia.load_balancer_create(
|
||||
json={'loadbalancer': body})['loadbalancer']
|
||||
except Exception as e:
|
||||
LOG.error("Failed to create loadbalancer (%(lb)s): %(e)s",
|
||||
{'lb': lb, 'e': e})
|
||||
return
|
||||
new_lb_id = new_lb['id']
|
||||
if not self._wait_for_lb_up(new_lb_id):
|
||||
LOG.error("New loadbalancer %s does not become active", new_lb_id)
|
||||
lb_body = self.prepare_lb_loadbalancer(lb)
|
||||
kw = {'loadbalancer': lb_body}
|
||||
if not self.octavia_rpc_client.call({}, 'loadbalancer_create', **kw):
|
||||
LOG.error("Failed to create loadbalancer (%s)", lb_body)
|
||||
self._delete_octavia_lb(lb_body)
|
||||
return
|
||||
|
||||
lb_id = lb['id']
|
||||
lb_body_for_deletion = copy.deepcopy(lb_body)
|
||||
lb_body_for_deletion['listeners'] = []
|
||||
lb_body_for_deletion['pools'] = []
|
||||
|
||||
listeners_map = {}
|
||||
for listener_dict in lb.get('listeners', []):
|
||||
listener_id = listener_dict['id']
|
||||
listener = orig_map['listeners'][listener_id]
|
||||
body = self.prepare_lb_listener(listener)
|
||||
# Update loadbalancer in listener
|
||||
body['loadbalancer_id'] = new_lb_id
|
||||
try:
|
||||
new_listener = self.dest_octavia.listener_create(
|
||||
json={'listener': body})['listener']
|
||||
except Exception as e:
|
||||
LOG.error("Failed to create listener (%(list)s): %(e)s",
|
||||
{'list': listener, 'e': e})
|
||||
body = self.prepare_lb_listener(listener, lb_body)
|
||||
body['loadbalancer'] = lb_body
|
||||
body['loadbalancer_id'] = lb_id
|
||||
kw = {'listener': body, 'cert': None}
|
||||
if not self.octavia_rpc_client.call({}, 'listener_create', **kw):
|
||||
LOG.error("Failed to create loadbalancer %(lb)s listener "
|
||||
"(%(list)s)", {'list': listener, 'lb': lb_id})
|
||||
self._delete_octavia_lb(lb_body_for_deletion)
|
||||
return
|
||||
if not self._wait_for_lb_up(new_lb_id):
|
||||
LOG.error("New loadbalancer %s does not become active after "
|
||||
"listener creation", new_lb_id)
|
||||
return
|
||||
# map old-id to new
|
||||
listeners_map[listener_id] = new_listener['id']
|
||||
listeners_map[listener_id] = body
|
||||
lb_body_for_deletion['listeners'].append(body)
|
||||
|
||||
pools_map = {}
|
||||
for pool_dict in lb.get('pools', []):
|
||||
pool_id = pool_dict['id']
|
||||
pool = orig_map['pools'][pool_id]
|
||||
body = self.prepare_lb_pool(pool)
|
||||
# Update loadbalancer and listeners in pool
|
||||
body['loadbalancer_id'] = new_lb_id
|
||||
pool_body = self.prepare_lb_pool(pool, lb_body)
|
||||
# Update listeners in pool
|
||||
if pool.get('listeners'):
|
||||
body['listener_id'] = listeners_map[pool['listeners'][0]['id']]
|
||||
try:
|
||||
new_pool = self.dest_octavia.pool_create(
|
||||
json={'pool': body})['pool']
|
||||
except Exception as e:
|
||||
LOG.error("Failed to create pool (%(pool)s): %(e)s",
|
||||
{'pool': pool, 'e': e})
|
||||
listener_id = pool['listeners'][0]['id']
|
||||
pool_body['listener_id'] = listener_id
|
||||
pool_body['listener'] = listeners_map.get(listener_id)
|
||||
kw = {'pool': pool_body}
|
||||
if not self.octavia_rpc_client.call({}, 'pool_create', **kw):
|
||||
LOG.error("Failed to create loadbalancer %(lb)s pool "
|
||||
"(%(pool)s)", {'pool': pool, 'lb': lb_id})
|
||||
self._delete_octavia_lb(lb_body_for_deletion)
|
||||
return
|
||||
if not self._wait_for_lb_up(new_lb_id):
|
||||
LOG.error("New loadbalancer %s does not become active after "
|
||||
"pool creation", new_lb_id)
|
||||
return
|
||||
# map old-id to new
|
||||
pools_map[pool_id] = new_pool['id']
|
||||
lb_body_for_deletion['pools'].append(pool)
|
||||
|
||||
# Add members to this pool
|
||||
source_members = self.source_octavia.member_list(pool_id)[
|
||||
'members']
|
||||
for member in source_members:
|
||||
body = self.prepare_lb_member(member)
|
||||
try:
|
||||
self.dest_octavia.member_create(
|
||||
new_pool['id'], json={'member': body})['member']
|
||||
except Exception as e:
|
||||
LOG.error("Failed to create member (%(member)s): %(e)s",
|
||||
{'member': member, 'e': e})
|
||||
return
|
||||
if not self._wait_for_lb_up(new_lb_id):
|
||||
LOG.error("New loadbalancer %s does not become active "
|
||||
"after member creation", new_lb_id)
|
||||
pool_members = self.octavia.member_list(pool_id)['members']
|
||||
for member in pool_members:
|
||||
body = self.prepare_lb_member(member, lb_body)
|
||||
if not member['subnet_id']:
|
||||
# Add the loadbalancer subnet
|
||||
body['subnet_id'] = lb_body['vip_subnet_id']
|
||||
|
||||
body['pool'] = pool_body
|
||||
kw = {'member': body}
|
||||
if not self.octavia_rpc_client.call({}, 'member_create', **kw):
|
||||
LOG.error("Failed to create pool %(pool)s member "
|
||||
"(%(member)s)",
|
||||
{'member': member, 'pool': pool_id})
|
||||
self._delete_octavia_lb(lb_body_for_deletion)
|
||||
return
|
||||
|
||||
# Add pool health monitor
|
||||
@ -835,61 +833,41 @@ class ApiReplayClient(utils.PrepareObjectForMigration):
|
||||
hm_id = pool['healthmonitor_id']
|
||||
hm = orig_map['hms'][hm_id]
|
||||
body = self.prepare_lb_hm(hm)
|
||||
body['pool'] = pool_body
|
||||
# Update pool id in hm
|
||||
body['pool_id'] = new_pool['id']
|
||||
try:
|
||||
self.dest_octavia.health_monitor_create(
|
||||
json={'healthmonitor': body})['healthmonitor']
|
||||
except Exception as e:
|
||||
LOG.error("Failed to create healthmonitor (%(hm)s): %(e)s",
|
||||
{'hm': hm, 'e': e})
|
||||
return
|
||||
if not self._wait_for_lb_up(new_lb_id):
|
||||
LOG.error("New loadbalancer %s does not become active "
|
||||
"after health monitor creation", new_lb_id)
|
||||
kw = {'healthmonitor': body}
|
||||
if not self.octavia_rpc_client.call(
|
||||
{}, 'healthmonitor_create', **kw):
|
||||
LOG.error("Failed to create pool %(pool)s healthmonitor "
|
||||
"(%(hm)s)", {'hm': hm, 'pool': pool_id})
|
||||
self._delete_octavia_lb(lb_body_for_deletion)
|
||||
return
|
||||
lb_body_for_deletion['pools'][-1]['healthmonitor'] = body
|
||||
|
||||
# Add listeners L7 policies
|
||||
for listener_id in listeners_map:
|
||||
for listener_id in listeners_map.keys():
|
||||
listener = orig_map['listeners'][listener_id]
|
||||
for l7pol_dict in listener.get('l7policies', []):
|
||||
l7pol = orig_map['l7pols'][l7pol_dict['id']]
|
||||
body = self.prepare_lb_l7policy(l7pol)
|
||||
# Update pool id in l7 policy
|
||||
body['listener_id'] = listeners_map[listener_id]
|
||||
# update redirect_pool_id
|
||||
if l7pol.get('redirect_pool_id'):
|
||||
body['redirect_pool_id'] = pools_map[
|
||||
l7pol['redirect_pool_id']]
|
||||
try:
|
||||
new_pol = self.dest_octavia.l7policy_create(
|
||||
json={'l7policy': body})['l7policy']
|
||||
except Exception as e:
|
||||
LOG.error("Failed to create l7policy (%(l7pol)s): "
|
||||
"%(e)s", {'l7pol': l7pol, 'e': e})
|
||||
return
|
||||
if not self._wait_for_lb_up(new_lb_id):
|
||||
LOG.error("New loadbalancer %s does not become active "
|
||||
"after L7 policy creation", new_lb_id)
|
||||
return
|
||||
l7_pol_id = l7pol_dict['id']
|
||||
l7pol = orig_map['l7pols'][l7_pol_id]
|
||||
pol_body = self.prepare_lb_l7policy(l7pol)
|
||||
|
||||
# Add the rules of this policy
|
||||
source_l7rules = self.source_octavia.l7rule_list(
|
||||
l7pol['id'])['rules']
|
||||
source_l7rules = self.octavia.l7rule_list(
|
||||
l7_pol_id)['rules']
|
||||
for rule in source_l7rules:
|
||||
body = self.prepare_lb_l7rule(rule)
|
||||
try:
|
||||
self.dest_octavia.l7rule_create(
|
||||
new_pol['id'], json={'rule': body})['rule']
|
||||
except Exception as e:
|
||||
LOG.error("Failed to create l7rule (%(rule)s): "
|
||||
"%(e)s", {'rule': rule, 'e': e})
|
||||
return
|
||||
if not self._wait_for_lb_up(new_lb_id):
|
||||
LOG.error("New loadbalancer %s does not become "
|
||||
"active after L7 rule creation",
|
||||
new_lb_id)
|
||||
return
|
||||
rule_body = self.prepare_lb_l7rule(rule)
|
||||
pol_body['rules'].append(rule_body)
|
||||
|
||||
kw = {'l7policy': pol_body}
|
||||
if not self.octavia_rpc_client.call(
|
||||
{}, 'l7policy_create', **kw):
|
||||
LOG.error("Failed to create l7policy (%(l7pol)s)",
|
||||
{'l7pol': l7pol})
|
||||
self._delete_octavia_lb(lb_body_for_deletion)
|
||||
return
|
||||
|
||||
LOG.info("Created loadbalancer %s", lb_id)
|
||||
|
||||
def _map_orig_objects_of_type(self, source_objects):
|
||||
result = {}
|
||||
@ -908,35 +886,58 @@ class ApiReplayClient(utils.PrepareObjectForMigration):
|
||||
return result
|
||||
|
||||
def migrate_octavia(self):
|
||||
"""Migrates Octavia objects from source to dest neutron.
|
||||
|
||||
Right now the Octavia objects are created with new IDS, and
|
||||
do not keep their original IDs
|
||||
"""Migrates Octavia NSX objects to the new neutron driver.
|
||||
The Octavia proccess & DB will remain unchanged.
|
||||
Using RPC connection to connect directly with the new plugin driver.
|
||||
"""
|
||||
# TODO(asarfaty): Keep original ids
|
||||
# Read all existing octavia resources
|
||||
try:
|
||||
source_loadbalancers = self.source_octavia.\
|
||||
load_balancer_list()['loadbalancers']
|
||||
source_listeners = self.source_octavia.listener_list()['listeners']
|
||||
source_pools = self.source_octavia.pool_list()['pools']
|
||||
source_hms = self.source_octavia.\
|
||||
health_monitor_list()['healthmonitors']
|
||||
source_l7pols = self.source_octavia.l7policy_list()['l7policies']
|
||||
loadbalancers = self.octavia.load_balancer_list()['loadbalancers']
|
||||
listeners = self.octavia.listener_list()['listeners']
|
||||
pools = self.octavia.pool_list()['pools']
|
||||
hms = self.octavia.health_monitor_list()['healthmonitors']
|
||||
l7pols = self.octavia.l7policy_list()['l7policies']
|
||||
except Exception as e:
|
||||
# Octavia might be disabled in the source
|
||||
LOG.info("Octavia was not found on the source server: %s", e)
|
||||
LOG.info("Octavia was not found on the server: %s", e)
|
||||
return
|
||||
|
||||
try:
|
||||
self.dest_octavia.load_balancer_list()
|
||||
except Exception as e:
|
||||
# Octavia might be disabled in the destination
|
||||
LOG.warning("Skipping Octavia migration. Octavia was not found "
|
||||
"on the destination server: %s", e)
|
||||
return
|
||||
orig_map = self._map_orig_lb_objects(source_listeners, source_pools,
|
||||
source_hms, source_l7pols)
|
||||
total_num = len(source_loadbalancers)
|
||||
# Init the RPC connection for sending messages to the octavia driver
|
||||
topic = d_const.OCTAVIA_TO_DRIVER_MIGRATION_TOPIC
|
||||
transport = messaging.get_rpc_transport(cfg.CONF)
|
||||
target = messaging.Target(topic=topic, exchange="common",
|
||||
namespace='control', fanout=False,
|
||||
version='1.0')
|
||||
self.octavia_rpc_client = messaging.RPCClient(transport, target)
|
||||
|
||||
# Initialize RPC listener for getting status updates from the driver
|
||||
# so that the rsource status will not change in the octavia DB
|
||||
topic = d_const.DRIVER_TO_OCTAVIA_MIGRATION_TOPIC
|
||||
server = socket.gethostname()
|
||||
target = messaging.Target(topic=topic, server=server,
|
||||
exchange="common", fanout=False)
|
||||
|
||||
class MigrationOctaviaDriverEndpoint(driver_lib.DriverLibrary):
|
||||
target = messaging.Target(namespace="control", version='1.0')
|
||||
|
||||
def update_loadbalancer_status(self, **kw):
|
||||
# Do nothing
|
||||
pass
|
||||
|
||||
endpoints = [MigrationOctaviaDriverEndpoint]
|
||||
access_policy = dispatcher.DefaultRPCAccessPolicy
|
||||
self.octavia_rpc_server = messaging.get_rpc_server(
|
||||
transport, target, endpoints, executor='threading',
|
||||
access_policy=access_policy)
|
||||
self.octavia_rpc_server.start()
|
||||
|
||||
orig_map = self._map_orig_lb_objects(listeners, pools,
|
||||
hms, l7pols)
|
||||
total_num = len(loadbalancers)
|
||||
LOG.info("Migrating %d loadbalancer(s)", total_num)
|
||||
for lb in source_loadbalancers:
|
||||
self._migrate_octavia_lb(lb, orig_map)
|
||||
for lb in loadbalancers:
|
||||
if lb['provisioning_status'] == 'ACTIVE':
|
||||
self._migrate_octavia_lb(lb, orig_map)
|
||||
else:
|
||||
LOG.info("Skipping %s loadbalancer %s",
|
||||
lb['provisioning_status'], lb['id'])
|
||||
|
@ -73,7 +73,6 @@ class PrepareObjectForMigration(object):
|
||||
|
||||
drop_subnet_fields = basic_ignore_fields + [
|
||||
'advanced_service_providers',
|
||||
'id',
|
||||
'service_types']
|
||||
|
||||
drop_port_fields = basic_ignore_fields + [
|
||||
@ -103,14 +102,14 @@ class PrepareObjectForMigration(object):
|
||||
drop_fwaas_group_fields = ['status']
|
||||
|
||||
lb_ignore_fields = ['created_at', 'updated_at', 'operating_status',
|
||||
'provisioning_status', 'id']
|
||||
'provisioning_status']
|
||||
drop_lb_loadbalancer_fields = lb_ignore_fields + [
|
||||
'listeners', 'pools', # Those objects will be created laster
|
||||
'vip_subnet_id', # vip_port_id will be used
|
||||
'listeners', 'pools', # Those objects will be created later
|
||||
'flavor_id', # not supported by the driver
|
||||
'vip_qos_policy_id', # not supported by the driver
|
||||
]
|
||||
drop_lb_listener_fields = lb_ignore_fields + [
|
||||
'loadbalancers', 'l7policies', 'default_pool_id']
|
||||
'l7policies', 'default_pool_id']
|
||||
drop_lb_pool_fields = lb_ignore_fields + [
|
||||
'loadbalancers', 'healthmonitor_id', 'listeners', 'members']
|
||||
drop_lb_member_fields = lb_ignore_fields
|
||||
@ -192,6 +191,7 @@ class PrepareObjectForMigration(object):
|
||||
# external networks needs some special care
|
||||
if body.get('router:external'):
|
||||
fields_reset = False
|
||||
# TODO(asarfaty): map external network neutron ids to Policy tier0
|
||||
for field in ['provider:network_type', 'provider:segmentation_id',
|
||||
'provider:physical_network']:
|
||||
if field in body:
|
||||
@ -264,10 +264,6 @@ class PrepareObjectForMigration(object):
|
||||
if 'device_owner' not in body:
|
||||
body['device_owner'] = ""
|
||||
|
||||
if body.get('device_owner') == 'Octavia':
|
||||
# remove device id & owner. Octavia will re-set it.
|
||||
body['device_id'] = ""
|
||||
body['device_owner'] = ""
|
||||
return body
|
||||
|
||||
def prepare_floatingip(self, fip, direct_call=False):
|
||||
@ -297,20 +293,31 @@ class PrepareObjectForMigration(object):
|
||||
def prepare_lb_loadbalancer(self, lb_obj):
|
||||
return self.drop_fields(lb_obj, self.drop_lb_loadbalancer_fields)
|
||||
|
||||
def prepare_lb_listener(self, lb_obj):
|
||||
return self.drop_fields(lb_obj, self.drop_lb_listener_fields)
|
||||
def prepare_lb_listener(self, listener_obj, lb_body):
|
||||
body = self.drop_fields(listener_obj, self.drop_lb_listener_fields)
|
||||
body['loadbalancer'] = lb_body
|
||||
body['loadbalancer_id'] = lb_body['id']
|
||||
return body
|
||||
|
||||
def prepare_lb_pool(self, lb_obj):
|
||||
return self.drop_fields(lb_obj, self.drop_lb_pool_fields)
|
||||
def prepare_lb_pool(self, pool_obj, lb_body):
|
||||
body = self.drop_fields(pool_obj, self.drop_lb_pool_fields)
|
||||
body['loadbalancer'] = lb_body
|
||||
body['loadbalancer_id'] = lb_body['id']
|
||||
return body
|
||||
|
||||
def prepare_lb_member(self, lb_obj):
|
||||
return self.drop_fields(lb_obj, self.drop_lb_member_fields)
|
||||
def prepare_lb_member(self, mem_obj, lb_body):
|
||||
body = self.drop_fields(mem_obj, self.drop_lb_member_fields)
|
||||
body['loadbalancer'] = lb_body
|
||||
body['loadbalancer_id'] = lb_body['id']
|
||||
return body
|
||||
|
||||
def prepare_lb_hm(self, lb_obj):
|
||||
return self.drop_fields(lb_obj, self.drop_lb_hm_fields)
|
||||
|
||||
def prepare_lb_l7policy(self, lb_obj):
|
||||
return self.drop_fields(lb_obj, self.drop_lb_l7policy_fields)
|
||||
body = self.drop_fields(lb_obj, self.drop_lb_l7policy_fields)
|
||||
body['rules'] = []
|
||||
return body
|
||||
|
||||
def prepare_lb_l7rule(self, lb_obj):
|
||||
return self.drop_fields(lb_obj, self.drop_lb_l7rule_fields)
|
||||
|
@ -37,6 +37,9 @@ RESOURCE_ATTRIBUTE_MAP = {
|
||||
'networks': {
|
||||
'id': ID_WITH_POST,
|
||||
},
|
||||
'subnets': {
|
||||
'id': ID_WITH_POST,
|
||||
},
|
||||
'security_groups': {
|
||||
'id': ID_WITH_POST,
|
||||
'name': {'allow_post': True, 'allow_put': True,
|
||||
|
@ -945,6 +945,7 @@ class NsxPolicyPlugin(nsx_plugin_common.NsxPluginV3Base):
|
||||
self.nsxpolicy.tier1.update(router_id,
|
||||
ipv6_ndra_profile_id=profile_id)
|
||||
|
||||
@nsx_plugin_common.api_replay_mode_wrapper
|
||||
def create_subnet(self, context, subnet):
|
||||
return self._create_subnet(context, subnet)
|
||||
|
||||
|
@ -1220,6 +1220,7 @@ class NsxV3Plugin(nsx_plugin_common.NsxPluginV3Base,
|
||||
LOG.warning("Failed to update network %(id)s dhcp server on "
|
||||
"the NSX: %(e)s", {'id': network['id'], 'e': e})
|
||||
|
||||
@nsx_plugin_common.api_replay_mode_wrapper
|
||||
def create_subnet(self, context, subnet):
|
||||
return self._create_subnet(context, subnet)
|
||||
|
||||
|
@ -103,8 +103,11 @@ class EdgeMemberManagerFromDict(base_mgr.NsxpLoadbalancerBaseManager):
|
||||
def create(self, context, member, completor):
|
||||
pool_client = self.core_plugin.nsxpolicy.load_balancer.lb_pool
|
||||
self._validate_member_lb_connectivity(context, member, completor)
|
||||
network = lb_utils.get_network_from_subnet(
|
||||
context, self.core_plugin, member['subnet_id'])
|
||||
if member.get('subnet_id'):
|
||||
network = lb_utils.get_network_from_subnet(
|
||||
context, self.core_plugin, member['subnet_id'])
|
||||
else:
|
||||
network = None
|
||||
if network and network.get('router:external'):
|
||||
fixed_ip = self._get_info_from_fip(context, member['address'])
|
||||
else:
|
||||
|
@ -14,7 +14,9 @@
|
||||
# under the License.
|
||||
|
||||
OCTAVIA_TO_DRIVER_TOPIC = 'vmware_nsx__lb_listener'
|
||||
OCTAVIA_TO_DRIVER_MIGRATION_TOPIC = 'vmware_nsx__lb_listener_migration'
|
||||
DRIVER_TO_OCTAVIA_TOPIC = 'vmware_nsx__driver_listener'
|
||||
DRIVER_TO_OCTAVIA_MIGRATION_TOPIC = 'vmware_nsx__driver_listener_migration'
|
||||
|
||||
LOADBALANCER = 'loadbalancer'
|
||||
LISTENER = 'listener'
|
||||
|
@ -44,7 +44,10 @@ class NSXOctaviaListener(object):
|
||||
loadbalancer, member, pool)
|
||||
|
||||
def _init_rpc_messaging(self):
|
||||
topic = constants.DRIVER_TO_OCTAVIA_TOPIC
|
||||
if cfg.CONF.api_replay_mode:
|
||||
topic = constants.DRIVER_TO_OCTAVIA_MIGRATION_TOPIC
|
||||
else:
|
||||
topic = constants.DRIVER_TO_OCTAVIA_TOPIC
|
||||
transport = messaging.get_rpc_transport(cfg.CONF)
|
||||
target = messaging.Target(topic=topic, exchange="common",
|
||||
namespace='control', fanout=False,
|
||||
@ -54,7 +57,10 @@ class NSXOctaviaListener(object):
|
||||
def _init_rpc_listener(self, healthmonitor, l7policy, l7rule, listener,
|
||||
loadbalancer, member, pool):
|
||||
# Initialize RPC listener
|
||||
topic = constants.OCTAVIA_TO_DRIVER_TOPIC
|
||||
if cfg.CONF.api_replay_mode:
|
||||
topic = constants.OCTAVIA_TO_DRIVER_MIGRATION_TOPIC
|
||||
else:
|
||||
topic = constants.OCTAVIA_TO_DRIVER_TOPIC
|
||||
server = socket.gethostname()
|
||||
transport = messaging.get_rpc_transport(cfg.CONF)
|
||||
target = messaging.Target(topic=topic, server=server,
|
||||
@ -299,6 +305,8 @@ class NSXOctaviaListenerEndpoint(object):
|
||||
except Exception as e:
|
||||
LOG.error('NSX driver loadbalancer_create failed %s', e)
|
||||
completor(success=False)
|
||||
return False
|
||||
return True
|
||||
|
||||
@log_helpers.log_method_call
|
||||
def loadbalancer_delete_cascade(self, ctxt, loadbalancer):
|
||||
@ -336,6 +344,8 @@ class NSXOctaviaListenerEndpoint(object):
|
||||
except Exception as e:
|
||||
LOG.error('NSX driver loadbalancer_delete_cascade failed %s', e)
|
||||
completor(success=False)
|
||||
return False
|
||||
return True
|
||||
|
||||
@log_helpers.log_method_call
|
||||
def loadbalancer_delete(self, ctxt, loadbalancer, cascade=False):
|
||||
@ -350,6 +360,8 @@ class NSXOctaviaListenerEndpoint(object):
|
||||
except Exception as e:
|
||||
LOG.error('NSX driver loadbalancer_delete failed %s', e)
|
||||
completor(success=False)
|
||||
return False
|
||||
return True
|
||||
|
||||
@log_helpers.log_method_call
|
||||
def loadbalancer_update(self, ctxt, old_loadbalancer, new_loadbalancer):
|
||||
@ -362,6 +374,8 @@ class NSXOctaviaListenerEndpoint(object):
|
||||
except Exception as e:
|
||||
LOG.error('NSX driver loadbalancer_update failed %s', e)
|
||||
completor(success=False)
|
||||
return False
|
||||
return True
|
||||
|
||||
# Listener
|
||||
@log_helpers.log_method_call
|
||||
@ -375,6 +389,8 @@ class NSXOctaviaListenerEndpoint(object):
|
||||
except Exception as e:
|
||||
LOG.error('NSX driver listener_create failed %s', e)
|
||||
completor(success=False)
|
||||
return False
|
||||
return True
|
||||
|
||||
@log_helpers.log_method_call
|
||||
def listener_delete(self, ctxt, listener):
|
||||
@ -386,6 +402,8 @@ class NSXOctaviaListenerEndpoint(object):
|
||||
except Exception as e:
|
||||
LOG.error('NSX driver listener_delete failed %s', e)
|
||||
completor(success=False)
|
||||
return False
|
||||
return True
|
||||
|
||||
@log_helpers.log_method_call
|
||||
def listener_update(self, ctxt, old_listener, new_listener, cert):
|
||||
@ -398,6 +416,8 @@ class NSXOctaviaListenerEndpoint(object):
|
||||
except Exception as e:
|
||||
LOG.error('NSX driver listener_update failed %s', e)
|
||||
completor(success=False)
|
||||
return False
|
||||
return True
|
||||
|
||||
# Pool
|
||||
@log_helpers.log_method_call
|
||||
@ -410,6 +430,8 @@ class NSXOctaviaListenerEndpoint(object):
|
||||
except Exception as e:
|
||||
LOG.error('NSX driver pool_create failed %s', e)
|
||||
completor(success=False)
|
||||
return False
|
||||
return True
|
||||
|
||||
@log_helpers.log_method_call
|
||||
def pool_delete(self, ctxt, pool):
|
||||
@ -421,6 +443,8 @@ class NSXOctaviaListenerEndpoint(object):
|
||||
except Exception as e:
|
||||
LOG.error('NSX driver pool_delete failed %s', e)
|
||||
completor(success=False)
|
||||
return False
|
||||
return True
|
||||
|
||||
@log_helpers.log_method_call
|
||||
def pool_update(self, ctxt, old_pool, new_pool):
|
||||
@ -432,6 +456,8 @@ class NSXOctaviaListenerEndpoint(object):
|
||||
except Exception as e:
|
||||
LOG.error('NSX driver pool_update failed %s', e)
|
||||
completor(success=False)
|
||||
return False
|
||||
return True
|
||||
|
||||
# Member
|
||||
@log_helpers.log_method_call
|
||||
@ -444,6 +470,8 @@ class NSXOctaviaListenerEndpoint(object):
|
||||
except Exception as e:
|
||||
LOG.error('NSX driver member_create failed %s', e)
|
||||
completor(success=False)
|
||||
return False
|
||||
return True
|
||||
|
||||
@log_helpers.log_method_call
|
||||
def member_delete(self, ctxt, member):
|
||||
@ -455,6 +483,8 @@ class NSXOctaviaListenerEndpoint(object):
|
||||
except Exception as e:
|
||||
LOG.error('NSX driver member_delete failed %s', e)
|
||||
completor(success=False)
|
||||
return False
|
||||
return True
|
||||
|
||||
@log_helpers.log_method_call
|
||||
def member_update(self, ctxt, old_member, new_member):
|
||||
@ -466,6 +496,8 @@ class NSXOctaviaListenerEndpoint(object):
|
||||
except Exception as e:
|
||||
LOG.error('NSX driver member_update failed %s', e)
|
||||
completor(success=False)
|
||||
return False
|
||||
return True
|
||||
|
||||
# Health Monitor
|
||||
@log_helpers.log_method_call
|
||||
@ -478,6 +510,8 @@ class NSXOctaviaListenerEndpoint(object):
|
||||
except Exception as e:
|
||||
LOG.error('NSX driver healthmonitor_create failed %s', e)
|
||||
completor(success=False)
|
||||
return False
|
||||
return True
|
||||
|
||||
@log_helpers.log_method_call
|
||||
def healthmonitor_delete(self, ctxt, healthmonitor):
|
||||
@ -489,6 +523,8 @@ class NSXOctaviaListenerEndpoint(object):
|
||||
except Exception as e:
|
||||
LOG.error('NSX driver healthmonitor_delete failed %s', e)
|
||||
completor(success=False)
|
||||
return False
|
||||
return True
|
||||
|
||||
@log_helpers.log_method_call
|
||||
def healthmonitor_update(self, ctxt, old_healthmonitor, new_healthmonitor):
|
||||
@ -501,6 +537,8 @@ class NSXOctaviaListenerEndpoint(object):
|
||||
except Exception as e:
|
||||
LOG.error('NSX driver healthmonitor_update failed %s', e)
|
||||
completor(success=False)
|
||||
return False
|
||||
return True
|
||||
|
||||
# L7 Policy
|
||||
@log_helpers.log_method_call
|
||||
@ -513,6 +551,8 @@ class NSXOctaviaListenerEndpoint(object):
|
||||
except Exception as e:
|
||||
LOG.error('NSX driver l7policy_create failed %s', e)
|
||||
completor(success=False)
|
||||
return False
|
||||
return True
|
||||
|
||||
@log_helpers.log_method_call
|
||||
def l7policy_delete(self, ctxt, l7policy):
|
||||
@ -524,6 +564,8 @@ class NSXOctaviaListenerEndpoint(object):
|
||||
except Exception as e:
|
||||
LOG.error('NSX driver l7policy_delete failed %s', e)
|
||||
completor(success=False)
|
||||
return False
|
||||
return True
|
||||
|
||||
@log_helpers.log_method_call
|
||||
def l7policy_update(self, ctxt, old_l7policy, new_l7policy):
|
||||
@ -535,6 +577,8 @@ class NSXOctaviaListenerEndpoint(object):
|
||||
except Exception as e:
|
||||
LOG.error('NSX driver l7policy_update failed %s', e)
|
||||
completor(success=False)
|
||||
return False
|
||||
return True
|
||||
|
||||
# L7 Rule
|
||||
@log_helpers.log_method_call
|
||||
@ -546,6 +590,8 @@ class NSXOctaviaListenerEndpoint(object):
|
||||
except Exception as e:
|
||||
LOG.error('NSX driver l7rule_create failed %s', e)
|
||||
completor(success=False)
|
||||
return False
|
||||
return True
|
||||
|
||||
@log_helpers.log_method_call
|
||||
def l7rule_delete(self, ctxt, l7rule):
|
||||
@ -557,6 +603,8 @@ class NSXOctaviaListenerEndpoint(object):
|
||||
except Exception as e:
|
||||
LOG.error('NSX driver l7rule_delete failed %s', e)
|
||||
completor(success=False)
|
||||
return False
|
||||
return True
|
||||
|
||||
@log_helpers.log_method_call
|
||||
def l7rule_update(self, ctxt, old_l7rule, new_l7rule):
|
||||
@ -567,6 +615,8 @@ class NSXOctaviaListenerEndpoint(object):
|
||||
except Exception as e:
|
||||
LOG.error('NSX driver l7rule_update failed %s', e)
|
||||
completor(success=False)
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
class NSXOctaviaStatisticsCollector(object):
|
||||
|
Loading…
Reference in New Issue
Block a user