4a36519393
jsonutils provides multiple benefits in comparison to pure stdlib json (like using simplejson on Python 2.6). Similar patch was already merged before [1], but since it lacked hacking rule to enforce jsonutils usage, new occurrences of stdlib json module usage were introduced. This patch switches all the code to using jsonutils and adds a hacking rule to enforce the rule. The hacking rule requires that jsonutils module does not mimic as 'json' thru using import renames, so the code was updated not to rename the module when doing import. The hacking rule was shamelessly copied from the corresponding nova review [2]. [1]: https://review.openstack.org/#/c/99760/ [2]: https://review.openstack.org/111296/ Change-Id: Ie7a5bb76445e15cde9fbf9ff3d2101a014637b37
1117 lines
44 KiB
Python
1117 lines
44 KiB
Python
# Copyright 2013 Radware LTD.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
#
|
|
# @author: Avishay Balderman, Radware
|
|
|
|
import base64
|
|
import copy
|
|
import httplib
|
|
import netaddr
|
|
import threading
|
|
import time
|
|
|
|
|
|
import eventlet
|
|
eventlet.monkey_patch(thread=True)
|
|
|
|
from oslo.config import cfg
|
|
from six.moves import queue as Queue
|
|
|
|
from neutron.api.v2 import attributes
|
|
from neutron.common import log as call_log
|
|
from neutron import context
|
|
from neutron.db.loadbalancer import loadbalancer_db as lb_db
|
|
from neutron.extensions import loadbalancer
|
|
from neutron.openstack.common import excutils
|
|
from neutron.openstack.common import jsonutils
|
|
from neutron.openstack.common import log as logging
|
|
from neutron.plugins.common import constants
|
|
from neutron.services.loadbalancer.drivers import abstract_driver
|
|
from neutron.services.loadbalancer.drivers.radware import exceptions as r_exc
|
|
|
|
LOG = logging.getLogger(__name__)
|
|
|
|
RESP_STATUS = 0
|
|
RESP_REASON = 1
|
|
RESP_STR = 2
|
|
RESP_DATA = 3
|
|
|
|
TEMPLATE_HEADER = {'Content-Type':
|
|
'application/vnd.com.radware.vdirect.'
|
|
'template-parameters+json'}
|
|
PROVISION_HEADER = {'Content-Type':
|
|
'application/vnd.com.radware.'
|
|
'vdirect.status+json'}
|
|
CREATE_SERVICE_HEADER = {'Content-Type':
|
|
'application/vnd.com.radware.'
|
|
'vdirect.adc-service-specification+json'}
|
|
|
|
driver_opts = [
|
|
cfg.StrOpt('vdirect_address',
|
|
help=_('IP address of vDirect server.')),
|
|
cfg.StrOpt('ha_secondary_address',
|
|
help=_('IP address of secondary vDirect server.')),
|
|
cfg.StrOpt('vdirect_user',
|
|
default='vDirect',
|
|
help=_('vDirect user name.')),
|
|
cfg.StrOpt('vdirect_password',
|
|
default='radware',
|
|
help=_('vDirect user password.')),
|
|
cfg.StrOpt('service_adc_type',
|
|
default="VA",
|
|
help=_('Service ADC type. Default: VA.')),
|
|
cfg.StrOpt('service_adc_version',
|
|
default="",
|
|
help=_('Service ADC version.')),
|
|
cfg.BoolOpt('service_ha_pair',
|
|
default=False,
|
|
help=_('Enables or disables the Service HA pair. '
|
|
'Default: False.')),
|
|
cfg.IntOpt('service_throughput',
|
|
default=1000,
|
|
help=_('Service throughput. Default: 1000.')),
|
|
cfg.IntOpt('service_ssl_throughput',
|
|
default=100,
|
|
help=_('Service SSL throughput. Default: 100.')),
|
|
cfg.IntOpt('service_compression_throughput',
|
|
default=100,
|
|
help=_('Service compression throughput. Default: 100.')),
|
|
cfg.IntOpt('service_cache',
|
|
default=20,
|
|
help=_('Size of service cache. Default: 20.')),
|
|
cfg.StrOpt('l2_l3_workflow_name',
|
|
default='openstack_l2_l3',
|
|
help=_('Name of l2_l3 workflow. Default: '
|
|
'openstack_l2_l3.')),
|
|
cfg.StrOpt('l4_workflow_name',
|
|
default='openstack_l4',
|
|
help=_('Name of l4 workflow. Default: openstack_l4.')),
|
|
cfg.DictOpt('l2_l3_ctor_params',
|
|
default={"service": "_REPLACE_",
|
|
"ha_network_name": "HA-Network",
|
|
"ha_ip_pool_name": "default",
|
|
"allocate_ha_vrrp": True,
|
|
"allocate_ha_ips": True,
|
|
"twoleg_enabled": "_REPLACE_"},
|
|
help=_('Parameter for l2_l3 workflow constructor.')),
|
|
cfg.DictOpt('l2_l3_setup_params',
|
|
default={"data_port": 1,
|
|
"data_ip_address": "192.168.200.99",
|
|
"data_ip_mask": "255.255.255.0",
|
|
"gateway": "192.168.200.1",
|
|
"ha_port": 2},
|
|
help=_('Parameter for l2_l3 workflow setup.')),
|
|
cfg.ListOpt('actions_to_skip',
|
|
default=['setup_l2_l3'],
|
|
help=_('List of actions that are not pushed to '
|
|
'the completion queue.')),
|
|
cfg.StrOpt('l4_action_name',
|
|
default='BaseCreate',
|
|
help=_('Name of the l4 workflow action. '
|
|
'Default: BaseCreate.')),
|
|
cfg.ListOpt('service_resource_pool_ids',
|
|
default=[],
|
|
help=_('Resource pool IDs.')),
|
|
cfg.IntOpt('service_isl_vlan',
|
|
default=-1,
|
|
help=_('A required VLAN for the interswitch link to use.')),
|
|
cfg.BoolOpt('service_session_mirroring_enabled',
|
|
default=False,
|
|
help=_('Enable or disable Alteon interswitch link for '
|
|
'stateful session failover. Default: False.'))
|
|
]
|
|
|
|
cfg.CONF.register_opts(driver_opts, "radware")
|
|
|
|
|
|
class LoadBalancerDriver(abstract_driver.LoadBalancerAbstractDriver):
|
|
|
|
"""Radware lbaas driver."""
|
|
|
|
def __init__(self, plugin):
|
|
rad = cfg.CONF.radware
|
|
self.plugin = plugin
|
|
self.service = {
|
|
"haPair": rad.service_ha_pair,
|
|
"sessionMirroringEnabled": rad.service_session_mirroring_enabled,
|
|
"primary": {
|
|
"capacity": {
|
|
"throughput": rad.service_throughput,
|
|
"sslThroughput": rad.service_ssl_throughput,
|
|
"compressionThroughput":
|
|
rad.service_compression_throughput,
|
|
"cache": rad.service_cache
|
|
},
|
|
"network": {
|
|
"type": "portgroup",
|
|
"portgroups": ['DATA_NETWORK']
|
|
},
|
|
"adcType": rad.service_adc_type,
|
|
"acceptableAdc": "Exact"
|
|
}
|
|
}
|
|
if rad.service_resource_pool_ids:
|
|
ids = rad.service_resource_pool_ids
|
|
self.service['resourcePoolIds'] = [
|
|
{'name': id} for id in ids
|
|
]
|
|
if rad.service_isl_vlan:
|
|
self.service['islVlan'] = rad.service_isl_vlan
|
|
self.l2_l3_wf_name = rad.l2_l3_workflow_name
|
|
self.l4_wf_name = rad.l4_workflow_name
|
|
self.l2_l3_ctor_params = rad.l2_l3_ctor_params
|
|
self.l2_l3_setup_params = rad.l2_l3_setup_params
|
|
self.l4_action_name = rad.l4_action_name
|
|
self.actions_to_skip = rad.actions_to_skip
|
|
vdirect_address = rad.vdirect_address
|
|
sec_server = rad.ha_secondary_address
|
|
self.rest_client = vDirectRESTClient(server=vdirect_address,
|
|
secondary_server=sec_server,
|
|
user=rad.vdirect_user,
|
|
password=rad.vdirect_password)
|
|
self.queue = Queue.Queue()
|
|
self.completion_handler = OperationCompletionHandler(self.queue,
|
|
self.rest_client,
|
|
plugin)
|
|
self.workflow_templates_exists = False
|
|
self.completion_handler.setDaemon(True)
|
|
self.completion_handler_started = False
|
|
|
|
def _populate_vip_graph(self, context, vip):
|
|
ext_vip = self.plugin.populate_vip_graph(context, vip)
|
|
vip_network_id = self._get_vip_network_id(context, ext_vip)
|
|
pool_network_id = self._get_pool_network_id(context, ext_vip)
|
|
|
|
# if VIP and PIP are different, we need an IP address for the PIP
|
|
# so create port on PIP's network and use its IP address
|
|
if vip_network_id != pool_network_id:
|
|
pip_address = self._get_pip(
|
|
context,
|
|
vip['tenant_id'],
|
|
_make_pip_name_from_vip(vip),
|
|
pool_network_id,
|
|
ext_vip['pool']['subnet_id'])
|
|
ext_vip['pip_address'] = pip_address
|
|
else:
|
|
ext_vip['pip_address'] = vip['address']
|
|
|
|
ext_vip['vip_network_id'] = vip_network_id
|
|
ext_vip['pool_network_id'] = pool_network_id
|
|
return ext_vip
|
|
|
|
def create_vip(self, context, vip):
|
|
log_info = {'vip': vip,
|
|
'extended_vip': 'NOT_ASSIGNED',
|
|
'service_name': 'NOT_ASSIGNED'}
|
|
try:
|
|
ext_vip = self._populate_vip_graph(context, vip)
|
|
|
|
service_name = self._get_service(ext_vip)
|
|
log_info['extended_vip'] = ext_vip
|
|
log_info['service_name'] = service_name
|
|
|
|
self._create_workflow(
|
|
vip['pool_id'], self.l4_wf_name,
|
|
{"service": service_name})
|
|
self._update_workflow(
|
|
vip['pool_id'],
|
|
self.l4_action_name, ext_vip, context)
|
|
|
|
finally:
|
|
LOG.debug(_('vip: %(vip)s, '
|
|
'extended_vip: %(extended_vip)s, '
|
|
'service_name: %(service_name)s, '),
|
|
log_info)
|
|
|
|
def update_vip(self, context, old_vip, vip):
|
|
ext_vip = self._populate_vip_graph(context, vip)
|
|
self._update_workflow(
|
|
vip['pool_id'], self.l4_action_name,
|
|
ext_vip, context, False, lb_db.Vip, vip['id'])
|
|
|
|
def delete_vip(self, context, vip):
|
|
"""Delete a Vip
|
|
|
|
First delete it from the device. If deletion ended OK
|
|
- remove data from DB as well.
|
|
If the deletion failed - mark vip with error status in DB
|
|
|
|
"""
|
|
|
|
ext_vip = self._populate_vip_graph(context, vip)
|
|
params = _translate_vip_object_graph(ext_vip,
|
|
self.plugin, context)
|
|
ids = params.pop('__ids__')
|
|
|
|
try:
|
|
# get neutron port id associated with the vip (present if vip and
|
|
# pip are different) and release it after workflow removed
|
|
port_filter = {
|
|
'name': [_make_pip_name_from_vip(vip)],
|
|
}
|
|
ports = self.plugin._core_plugin.get_ports(context,
|
|
filters=port_filter)
|
|
if ports:
|
|
LOG.debug(_('Retrieved pip nport: %(port)r for '
|
|
'vip: %(vip)s'), {'port': ports[0],
|
|
'vip': vip['id']})
|
|
|
|
delete_pip_nport_function = self._get_delete_pip_nports(
|
|
context, ports)
|
|
else:
|
|
delete_pip_nport_function = None
|
|
LOG.debug(_('Found no pip nports associated with '
|
|
'vip: %s'), vip['id'])
|
|
|
|
# removing the WF will cause deletion of the configuration from the
|
|
# device
|
|
self._remove_workflow(ids, context, delete_pip_nport_function)
|
|
|
|
except r_exc.RESTRequestFailure:
|
|
pool_id = ext_vip['pool_id']
|
|
LOG.exception(_('Failed to remove workflow %s. '
|
|
'Going to set vip to ERROR status'),
|
|
pool_id)
|
|
|
|
self.plugin.update_status(context, lb_db.Vip, ids['vip'],
|
|
constants.ERROR)
|
|
|
|
def _get_delete_pip_nports(self, context, ports):
|
|
def _delete_pip_nports(success):
|
|
if success:
|
|
for port in ports:
|
|
try:
|
|
self.plugin._core_plugin.delete_port(
|
|
context, port['id'])
|
|
LOG.debug(_('pip nport id: %s'), port['id'])
|
|
except Exception as exception:
|
|
# stop exception propagation, nport may have
|
|
# been deleted by other means
|
|
LOG.warning(_('pip nport delete failed: %r'),
|
|
exception)
|
|
return _delete_pip_nports
|
|
|
|
def create_pool(self, context, pool):
|
|
# nothing to do
|
|
pass
|
|
|
|
def update_pool(self, context, old_pool, pool):
|
|
self._handle_pool(context, pool)
|
|
|
|
def delete_pool(self, context, pool,):
|
|
self._handle_pool(context, pool, delete=True)
|
|
|
|
def _handle_pool(self, context, pool, delete=False):
|
|
vip_id = self.plugin.get_pool(context, pool['id']).get('vip_id', None)
|
|
if vip_id:
|
|
if delete:
|
|
raise loadbalancer.PoolInUse(pool_id=pool['id'])
|
|
else:
|
|
vip = self.plugin.get_vip(context, vip_id)
|
|
ext_vip = self._populate_vip_graph(context, vip)
|
|
self._update_workflow(
|
|
pool['id'], self.l4_action_name,
|
|
ext_vip, context, delete, lb_db.Pool, pool['id'])
|
|
else:
|
|
if delete:
|
|
self.plugin._delete_db_pool(context, pool['id'])
|
|
else:
|
|
# we keep the pool in PENDING_UPDATE
|
|
# no point to modify it since it is not connected to vip yet
|
|
pass
|
|
|
|
def create_member(self, context, member):
|
|
self._handle_member(context, member)
|
|
|
|
def update_member(self, context, old_member, member):
|
|
self._handle_member(context, member)
|
|
|
|
def delete_member(self, context, member):
|
|
self._handle_member(context, member, delete=True)
|
|
|
|
def _handle_member(self, context, member, delete=False):
|
|
"""Navigate the model. If a Vip is found - activate a bulk WF action.
|
|
"""
|
|
vip_id = self.plugin.get_pool(
|
|
context, member['pool_id']).get('vip_id')
|
|
if vip_id:
|
|
vip = self.plugin.get_vip(context, vip_id)
|
|
ext_vip = self._populate_vip_graph(context, vip)
|
|
self._update_workflow(
|
|
member['pool_id'], self.l4_action_name,
|
|
ext_vip, context,
|
|
delete, lb_db.Member, member['id'])
|
|
# We have to delete this member but it is not connected to a vip yet
|
|
elif delete:
|
|
self.plugin._delete_db_member(context, member['id'])
|
|
|
|
def create_health_monitor(self, context, health_monitor):
|
|
# Anything to do here? the hm is not connected to the graph yet
|
|
pass
|
|
|
|
def update_pool_health_monitor(self, context, old_health_monitor,
|
|
health_monitor,
|
|
pool_id):
|
|
self._handle_pool_health_monitor(context, health_monitor, pool_id)
|
|
|
|
def create_pool_health_monitor(self, context,
|
|
health_monitor, pool_id):
|
|
self._handle_pool_health_monitor(context, health_monitor, pool_id)
|
|
|
|
def delete_pool_health_monitor(self, context, health_monitor, pool_id):
|
|
self._handle_pool_health_monitor(context, health_monitor, pool_id,
|
|
True)
|
|
|
|
def _handle_pool_health_monitor(self, context, health_monitor,
|
|
pool_id, delete=False):
|
|
"""Push a graph to vDirect
|
|
|
|
Navigate the model. Check if a pool is associated to the vip
|
|
and push the graph to vDirect
|
|
|
|
"""
|
|
|
|
vip_id = self.plugin.get_pool(context, pool_id).get('vip_id', None)
|
|
|
|
debug_params = {"hm_id": health_monitor['id'], "pool_id": pool_id,
|
|
"delete": delete, "vip_id": vip_id}
|
|
LOG.debug(_('_handle_pool_health_monitor. health_monitor = %(hm_id)s '
|
|
'pool_id = %(pool_id)s delete = %(delete)s '
|
|
'vip_id = %(vip_id)s'),
|
|
debug_params)
|
|
|
|
if vip_id:
|
|
vip = self.plugin.get_vip(context, vip_id)
|
|
ext_vip = self._populate_vip_graph(context, vip)
|
|
self._update_workflow(pool_id, self.l4_action_name,
|
|
ext_vip, context,
|
|
delete, lb_db.PoolMonitorAssociation,
|
|
health_monitor['id'])
|
|
elif delete:
|
|
self.plugin._delete_db_pool_health_monitor(context,
|
|
health_monitor['id'],
|
|
pool_id)
|
|
|
|
def stats(self, context, pool_id):
|
|
# TODO(avishayb) implement
|
|
return {"bytes_in": 0,
|
|
"bytes_out": 0,
|
|
"active_connections": 0,
|
|
"total_connections": 0}
|
|
|
|
def _get_vip_network_id(self, context, extended_vip):
|
|
subnet = self.plugin._core_plugin.get_subnet(
|
|
context, extended_vip['subnet_id'])
|
|
return subnet['network_id']
|
|
|
|
def _start_completion_handling_thread(self):
|
|
if not self.completion_handler_started:
|
|
LOG.info(_('Starting operation completion handling thread'))
|
|
self.completion_handler.start()
|
|
self.completion_handler_started = True
|
|
|
|
def _get_pool_network_id(self, context, extended_vip):
|
|
subnet = self.plugin._core_plugin.get_subnet(
|
|
context, extended_vip['pool']['subnet_id'])
|
|
return subnet['network_id']
|
|
|
|
@call_log.log
|
|
def _update_workflow(self, wf_name, action,
|
|
wf_params, context,
|
|
delete=False,
|
|
lbaas_entity=None, entity_id=None):
|
|
"""Update the WF state. Push the result to a queue for processing."""
|
|
|
|
if not self.workflow_templates_exists:
|
|
self._verify_workflow_templates()
|
|
|
|
if action not in self.actions_to_skip:
|
|
params = _translate_vip_object_graph(wf_params,
|
|
self.plugin,
|
|
context)
|
|
else:
|
|
params = wf_params
|
|
|
|
resource = '/api/workflow/%s/action/%s' % (wf_name, action)
|
|
response = _rest_wrapper(self.rest_client.call('POST', resource,
|
|
{'parameters': params},
|
|
TEMPLATE_HEADER))
|
|
LOG.debug(_('_update_workflow response: %s '), response)
|
|
|
|
if action not in self.actions_to_skip:
|
|
ids = params.pop('__ids__', None)
|
|
oper = OperationAttributes(response['uri'],
|
|
ids,
|
|
lbaas_entity,
|
|
entity_id,
|
|
delete=delete)
|
|
LOG.debug(_('Pushing operation %s to the queue'), oper)
|
|
|
|
self._start_completion_handling_thread()
|
|
self.queue.put_nowait(oper)
|
|
|
|
def _remove_workflow(self, ids, context, post_remove_function):
|
|
|
|
wf_name = ids['pool']
|
|
LOG.debug(_('Remove the workflow %s') % wf_name)
|
|
resource = '/api/workflow/%s' % (wf_name)
|
|
rest_return = self.rest_client.call('DELETE', resource, None, None)
|
|
response = _rest_wrapper(rest_return, [204, 202, 404])
|
|
if rest_return[RESP_STATUS] in [404]:
|
|
if post_remove_function:
|
|
try:
|
|
post_remove_function(True)
|
|
LOG.debug(_('Post-remove workflow function '
|
|
'%r completed'), post_remove_function)
|
|
except Exception:
|
|
with excutils.save_and_reraise_exception():
|
|
LOG.exception(_('Post-remove workflow function '
|
|
'%r failed'), post_remove_function)
|
|
self.plugin._delete_db_vip(context, ids['vip'])
|
|
else:
|
|
oper = OperationAttributes(
|
|
response['uri'],
|
|
ids,
|
|
lb_db.Vip,
|
|
ids['vip'],
|
|
delete=True,
|
|
post_op_function=post_remove_function)
|
|
LOG.debug(_('Pushing operation %s to the queue'), oper)
|
|
|
|
self._start_completion_handling_thread()
|
|
self.queue.put_nowait(oper)
|
|
|
|
def _remove_service(self, service_name):
|
|
resource = '/api/service/%s' % (service_name)
|
|
_rest_wrapper(self.rest_client.call('DELETE',
|
|
resource, None, None),
|
|
[202])
|
|
|
|
def _get_service(self, ext_vip):
|
|
"""Get a service name.
|
|
|
|
if you can't find one,
|
|
create a service and create l2_l3 WF.
|
|
|
|
"""
|
|
if not self.workflow_templates_exists:
|
|
self._verify_workflow_templates()
|
|
if ext_vip['vip_network_id'] != ext_vip['pool_network_id']:
|
|
networks_name = '%s_%s' % (ext_vip['vip_network_id'],
|
|
ext_vip['pool_network_id'])
|
|
self.l2_l3_ctor_params["twoleg_enabled"] = True
|
|
else:
|
|
networks_name = ext_vip['vip_network_id']
|
|
self.l2_l3_ctor_params["twoleg_enabled"] = False
|
|
incoming_service_name = 'srv_%s' % (networks_name,)
|
|
service_name = self._get_available_service(incoming_service_name)
|
|
if not service_name:
|
|
LOG.debug(
|
|
'Could not find a service named ' + incoming_service_name)
|
|
service_name = self._create_service(ext_vip['vip_network_id'],
|
|
ext_vip['pool_network_id'],
|
|
ext_vip['tenant_id'])
|
|
self.l2_l3_ctor_params["service"] = incoming_service_name
|
|
wf_name = 'l2_l3_' + networks_name
|
|
self._create_workflow(
|
|
wf_name, self.l2_l3_wf_name, self.l2_l3_ctor_params)
|
|
self._update_workflow(
|
|
wf_name, "setup_l2_l3", self.l2_l3_setup_params, None)
|
|
else:
|
|
LOG.debug('A service named ' + service_name + ' was found.')
|
|
return service_name
|
|
|
|
def _create_service(self, vip_network_id, pool_network_id, tenant_id):
|
|
"""create the service and provision it (async)."""
|
|
# 1) create the service
|
|
service = copy.deepcopy(self.service)
|
|
if vip_network_id != pool_network_id:
|
|
service_name = 'srv_%s_%s' % (vip_network_id, pool_network_id)
|
|
service['primary']['network']['portgroups'] = [vip_network_id,
|
|
pool_network_id]
|
|
else:
|
|
service_name = 'srv_' + vip_network_id
|
|
service['primary']['network']['portgroups'] = [vip_network_id]
|
|
resource = '/api/service?name=%s&tenant=%s' % (service_name, tenant_id)
|
|
|
|
response = _rest_wrapper(self.rest_client.call('POST', resource,
|
|
service,
|
|
CREATE_SERVICE_HEADER), [201])
|
|
|
|
# 2) provision the service
|
|
provision_uri = response['links']['actions']['provision']
|
|
_rest_wrapper(self.rest_client.call('POST', provision_uri,
|
|
None, PROVISION_HEADER))
|
|
return service_name
|
|
|
|
def _get_available_service(self, service_name):
|
|
"""Check if service exists and return its name if it does."""
|
|
resource = '/api/service/' + service_name
|
|
try:
|
|
_rest_wrapper(self.rest_client.call('GET',
|
|
resource,
|
|
None, None), [200])
|
|
except Exception:
|
|
return
|
|
return service_name
|
|
|
|
def _workflow_exists(self, pool_id):
|
|
"""Check if a WF having the name of the pool_id exists."""
|
|
resource = '/api/workflow/' + pool_id
|
|
try:
|
|
_rest_wrapper(self.rest_client.call('GET',
|
|
resource,
|
|
None,
|
|
None), [200])
|
|
except Exception:
|
|
return False
|
|
return True
|
|
|
|
def _create_workflow(self, wf_name, wf_template_name,
|
|
create_workflow_params=None):
|
|
"""Create a WF if it doesn't exists yet."""
|
|
if not self.workflow_templates_exists:
|
|
self._verify_workflow_templates()
|
|
if not self._workflow_exists(wf_name):
|
|
if not create_workflow_params:
|
|
create_workflow_params = {}
|
|
resource = '/api/workflowTemplate/%s?name=%s' % (
|
|
wf_template_name, wf_name)
|
|
params = {'parameters': create_workflow_params}
|
|
response = _rest_wrapper(self.rest_client.call('POST',
|
|
resource,
|
|
params,
|
|
TEMPLATE_HEADER))
|
|
LOG.debug(_('create_workflow response: %s'), str(response))
|
|
|
|
def _verify_workflow_templates(self):
|
|
"""Verify the existence of workflows on vDirect server."""
|
|
workflows = {self.l2_l3_wf_name:
|
|
False, self.l4_wf_name: False}
|
|
resource = '/api/workflowTemplate'
|
|
response = _rest_wrapper(self.rest_client.call('GET',
|
|
resource,
|
|
None,
|
|
None), [200])
|
|
for wf in workflows.keys():
|
|
for wf_template in response:
|
|
if wf == wf_template['name']:
|
|
workflows[wf] = True
|
|
break
|
|
for wf, found in workflows.items():
|
|
if not found:
|
|
raise r_exc.WorkflowMissing(workflow=wf)
|
|
self.workflow_templates_exists = True
|
|
|
|
def _get_pip(self, context, tenant_id, port_name,
|
|
network_id, subnet_id):
|
|
"""Get proxy IP
|
|
|
|
Creates or get port on network_id, returns that port's IP
|
|
on the subnet_id.
|
|
"""
|
|
|
|
port_filter = {
|
|
'name': [port_name],
|
|
}
|
|
ports = self.plugin._core_plugin.get_ports(context,
|
|
filters=port_filter)
|
|
if not ports:
|
|
# create port, we just want any IP allocated to the port
|
|
# based on the network id and subnet_id
|
|
port_data = {
|
|
'tenant_id': tenant_id,
|
|
'name': port_name,
|
|
'network_id': network_id,
|
|
'mac_address': attributes.ATTR_NOT_SPECIFIED,
|
|
'admin_state_up': False,
|
|
'device_id': '',
|
|
'device_owner': 'neutron:' + constants.LOADBALANCER,
|
|
'fixed_ips': [{'subnet_id': subnet_id}]
|
|
}
|
|
port = self.plugin._core_plugin.create_port(context,
|
|
{'port': port_data})
|
|
else:
|
|
port = ports[0]
|
|
ips_on_subnet = [ip for ip in port['fixed_ips']
|
|
if ip['subnet_id'] == subnet_id]
|
|
if not ips_on_subnet:
|
|
raise Exception(_('Could not find or allocate '
|
|
'IP address for subnet id %s'),
|
|
subnet_id)
|
|
else:
|
|
return ips_on_subnet[0]['ip_address']
|
|
|
|
|
|
class vDirectRESTClient:
|
|
"""REST server proxy to Radware vDirect."""
|
|
|
|
def __init__(self,
|
|
server='localhost',
|
|
secondary_server=None,
|
|
user=None,
|
|
password=None,
|
|
port=2189,
|
|
ssl=True,
|
|
timeout=5000,
|
|
base_uri=''):
|
|
self.server = server
|
|
self.secondary_server = secondary_server
|
|
self.port = port
|
|
self.ssl = ssl
|
|
self.base_uri = base_uri
|
|
self.timeout = timeout
|
|
if user and password:
|
|
self.auth = base64.encodestring('%s:%s' % (user, password))
|
|
self.auth = self.auth.replace('\n', '')
|
|
else:
|
|
raise r_exc.AuthenticationMissing()
|
|
|
|
debug_params = {'server': self.server,
|
|
'sec_server': self.secondary_server,
|
|
'port': self.port,
|
|
'ssl': self.ssl}
|
|
LOG.debug(_('vDirectRESTClient:init server=%(server)s, '
|
|
'secondary server=%(sec_server)s, '
|
|
'port=%(port)d, '
|
|
'ssl=%(ssl)r'), debug_params)
|
|
|
|
def _flip_servers(self):
|
|
LOG.warning(_('Fliping servers. Current is: %(server)s, '
|
|
'switching to %(secondary)s'),
|
|
{'server': self.server,
|
|
'secondary': self.secondary_server})
|
|
self.server, self.secondary_server = self.secondary_server, self.server
|
|
|
|
def _recover(self, action, resource, data, headers, binary=False):
|
|
if self.server and self.secondary_server:
|
|
self._flip_servers()
|
|
resp = self._call(action, resource, data,
|
|
headers, binary)
|
|
return resp
|
|
else:
|
|
LOG.exception(_('REST client is not able to recover '
|
|
'since only one vDirect server is '
|
|
'configured.'))
|
|
return -1, None, None, None
|
|
|
|
def call(self, action, resource, data, headers, binary=False):
|
|
resp = self._call(action, resource, data, headers, binary)
|
|
if resp[RESP_STATUS] == -1:
|
|
LOG.warning(_('vDirect server is not responding (%s).'),
|
|
self.server)
|
|
return self._recover(action, resource, data, headers, binary)
|
|
elif resp[RESP_STATUS] in (301, 307):
|
|
LOG.warning(_('vDirect server is not active (%s).'),
|
|
self.server)
|
|
return self._recover(action, resource, data, headers, binary)
|
|
else:
|
|
return resp
|
|
|
|
@call_log.log
|
|
def _call(self, action, resource, data, headers, binary=False):
|
|
if resource.startswith('http'):
|
|
uri = resource
|
|
else:
|
|
uri = self.base_uri + resource
|
|
if binary:
|
|
body = data
|
|
else:
|
|
body = jsonutils.dumps(data)
|
|
|
|
debug_data = 'binary' if binary else body
|
|
debug_data = debug_data if debug_data else 'EMPTY'
|
|
if not headers:
|
|
headers = {'Authorization': 'Basic %s' % self.auth}
|
|
else:
|
|
headers['Authorization'] = 'Basic %s' % self.auth
|
|
conn = None
|
|
if self.ssl:
|
|
conn = httplib.HTTPSConnection(
|
|
self.server, self.port, timeout=self.timeout)
|
|
if conn is None:
|
|
LOG.error(_('vdirectRESTClient: Could not establish HTTPS '
|
|
'connection'))
|
|
return 0, None, None, None
|
|
else:
|
|
conn = httplib.HTTPConnection(
|
|
self.server, self.port, timeout=self.timeout)
|
|
if conn is None:
|
|
LOG.error(_('vdirectRESTClient: Could not establish HTTP '
|
|
'connection'))
|
|
return 0, None, None, None
|
|
|
|
try:
|
|
conn.request(action, uri, body, headers)
|
|
response = conn.getresponse()
|
|
respstr = response.read()
|
|
respdata = respstr
|
|
try:
|
|
respdata = jsonutils.loads(respstr)
|
|
except ValueError:
|
|
# response was not JSON, ignore the exception
|
|
pass
|
|
ret = (response.status, response.reason, respstr, respdata)
|
|
except Exception as e:
|
|
log_dict = {'action': action, 'e': e}
|
|
LOG.error(_('vdirectRESTClient: %(action)s failure, %(e)r'),
|
|
log_dict)
|
|
ret = -1, None, None, None
|
|
conn.close()
|
|
return ret
|
|
|
|
|
|
class OperationAttributes:
|
|
|
|
"""Holds operation attributes.
|
|
|
|
The parameter 'post_op_function' (if supplied) is a function that takes
|
|
one boolean argument, specifying the success of the operation
|
|
|
|
"""
|
|
|
|
def __init__(self,
|
|
operation_url,
|
|
object_graph,
|
|
lbaas_entity=None,
|
|
entity_id=None,
|
|
delete=False,
|
|
post_op_function=None):
|
|
self.operation_url = operation_url
|
|
self.object_graph = object_graph
|
|
self.delete = delete
|
|
self.lbaas_entity = lbaas_entity
|
|
self.entity_id = entity_id
|
|
self.creation_time = time.time()
|
|
self.post_op_function = post_op_function
|
|
|
|
def __repr__(self):
|
|
items = ("%s = %r" % (k, v) for k, v in self.__dict__.items())
|
|
return "<%s: {%s}>" % (self.__class__.__name__, ', '.join(items))
|
|
|
|
|
|
class OperationCompletionHandler(threading.Thread):
|
|
|
|
"""Update DB with operation status or delete the entity from DB."""
|
|
|
|
def __init__(self, queue, rest_client, plugin):
|
|
threading.Thread.__init__(self)
|
|
self.queue = queue
|
|
self.rest_client = rest_client
|
|
self.plugin = plugin
|
|
self.stoprequest = threading.Event()
|
|
self.opers_to_handle_before_rest = 0
|
|
|
|
def join(self, timeout=None):
|
|
self.stoprequest.set()
|
|
super(OperationCompletionHandler, self).join(timeout)
|
|
|
|
def handle_operation_completion(self, oper):
|
|
result = self.rest_client.call('GET',
|
|
oper.operation_url,
|
|
None,
|
|
None)
|
|
completed = result[RESP_DATA]['complete']
|
|
reason = result[RESP_REASON],
|
|
description = result[RESP_STR]
|
|
if completed:
|
|
# operation is done - update the DB with the status
|
|
# or delete the entire graph from DB
|
|
success = result[RESP_DATA]['success']
|
|
sec_to_completion = time.time() - oper.creation_time
|
|
debug_data = {'oper': oper,
|
|
'sec_to_completion': sec_to_completion,
|
|
'success': success}
|
|
LOG.debug(_('Operation %(oper)s is completed after '
|
|
'%(sec_to_completion)d sec '
|
|
'with success status: %(success)s :'),
|
|
debug_data)
|
|
db_status = None
|
|
if not success:
|
|
# failure - log it and set the return ERROR as DB state
|
|
if reason or description:
|
|
msg = 'Reason:%s. Description:%s' % (reason, description)
|
|
else:
|
|
msg = "unknown"
|
|
error_params = {"operation": oper, "msg": msg}
|
|
LOG.error(_('Operation %(operation)s failed. Reason: %(msg)s'),
|
|
error_params)
|
|
db_status = constants.ERROR
|
|
else:
|
|
if oper.delete:
|
|
_remove_object_from_db(self.plugin, oper)
|
|
else:
|
|
db_status = constants.ACTIVE
|
|
|
|
if db_status:
|
|
_update_vip_graph_status(self.plugin, oper, db_status)
|
|
|
|
OperationCompletionHandler._run_post_op_function(success, oper)
|
|
|
|
return completed
|
|
|
|
def run(self):
|
|
while not self.stoprequest.isSet():
|
|
try:
|
|
oper = self.queue.get(timeout=1)
|
|
|
|
# Get the current queue size (N) and set the counter with it.
|
|
# Handle N operations with no intermission.
|
|
# Once N operations handles, get the size again and repeat.
|
|
if self.opers_to_handle_before_rest <= 0:
|
|
self.opers_to_handle_before_rest = self.queue.qsize() + 1
|
|
|
|
LOG.debug('Operation consumed from the queue: ' +
|
|
str(oper))
|
|
# check the status - if oper is done: update the db ,
|
|
# else push the oper again to the queue
|
|
if not self.handle_operation_completion(oper):
|
|
LOG.debug(_('Operation %s is not completed yet..') % oper)
|
|
# Not completed - push to the queue again
|
|
self.queue.put_nowait(oper)
|
|
|
|
self.queue.task_done()
|
|
self.opers_to_handle_before_rest -= 1
|
|
|
|
# Take one second rest before start handling
|
|
# new operations or operations handled before
|
|
if self.opers_to_handle_before_rest <= 0:
|
|
time.sleep(1)
|
|
|
|
except Queue.Empty:
|
|
continue
|
|
except Exception:
|
|
m = _("Exception was thrown inside OperationCompletionHandler")
|
|
LOG.exception(m)
|
|
|
|
@staticmethod
|
|
def _run_post_op_function(success, oper):
|
|
if oper.post_op_function:
|
|
log_data = {'func': oper.post_op_function, 'oper': oper}
|
|
try:
|
|
oper.post_op_function(success)
|
|
LOG.debug(_('Post-operation function '
|
|
'%(func)r completed '
|
|
'after operation %(oper)r'),
|
|
log_data)
|
|
except Exception:
|
|
with excutils.save_and_reraise_exception():
|
|
LOG.exception(_('Post-operation function '
|
|
'%(func)r failed '
|
|
'after operation %(oper)r'),
|
|
log_data)
|
|
|
|
|
|
def _rest_wrapper(response, success_codes=[202]):
|
|
"""Wrap a REST call and make sure a valid status is returned."""
|
|
if not response:
|
|
raise r_exc.RESTRequestFailure(
|
|
status=-1,
|
|
reason="Unknown",
|
|
description="Unknown",
|
|
success_codes=success_codes
|
|
)
|
|
elif response[RESP_STATUS] not in success_codes:
|
|
raise r_exc.RESTRequestFailure(
|
|
status=response[RESP_STATUS],
|
|
reason=response[RESP_REASON],
|
|
description=response[RESP_STR],
|
|
success_codes=success_codes
|
|
)
|
|
else:
|
|
return response[RESP_DATA]
|
|
|
|
|
|
def _make_pip_name_from_vip(vip):
|
|
"""Standard way of making PIP name based on VIP ID."""
|
|
return 'pip_' + vip['id']
|
|
|
|
|
|
def _update_vip_graph_status(plugin, oper, status):
|
|
"""Update the status
|
|
|
|
Of all the Vip object graph
|
|
or a specific entity in the graph.
|
|
|
|
"""
|
|
|
|
ctx = context.get_admin_context(load_admin_roles=False)
|
|
|
|
LOG.debug(_('_update: %s '), oper)
|
|
if oper.lbaas_entity == lb_db.PoolMonitorAssociation:
|
|
plugin.update_pool_health_monitor(ctx,
|
|
oper.entity_id,
|
|
oper.object_graph['pool'],
|
|
status)
|
|
elif oper.entity_id:
|
|
plugin.update_status(ctx,
|
|
oper.lbaas_entity,
|
|
oper.entity_id,
|
|
status)
|
|
else:
|
|
_update_vip_graph_status_cascade(plugin,
|
|
oper.object_graph,
|
|
ctx, status)
|
|
|
|
|
|
def _update_vip_graph_status_cascade(plugin, ids, ctx, status):
|
|
plugin.update_status(ctx,
|
|
lb_db.Vip,
|
|
ids['vip'],
|
|
status)
|
|
plugin.update_status(ctx,
|
|
lb_db.Pool,
|
|
ids['pool'],
|
|
status)
|
|
for member_id in ids['members']:
|
|
plugin.update_status(ctx,
|
|
lb_db.Member,
|
|
member_id,
|
|
status)
|
|
for hm_id in ids['health_monitors']:
|
|
plugin.update_pool_health_monitor(ctx,
|
|
hm_id,
|
|
ids['pool'],
|
|
status)
|
|
|
|
|
|
def _remove_object_from_db(plugin, oper):
|
|
"""Remove a specific entity from db."""
|
|
LOG.debug(_('_remove_object_from_db %s'), str(oper))
|
|
|
|
ctx = context.get_admin_context(load_admin_roles=False)
|
|
|
|
if oper.lbaas_entity == lb_db.PoolMonitorAssociation:
|
|
plugin._delete_db_pool_health_monitor(ctx,
|
|
oper.entity_id,
|
|
oper.object_graph['pool'])
|
|
elif oper.lbaas_entity == lb_db.Member:
|
|
plugin._delete_db_member(ctx, oper.entity_id)
|
|
elif oper.lbaas_entity == lb_db.Vip:
|
|
plugin._delete_db_vip(ctx, oper.entity_id)
|
|
elif oper.lbaas_entity == lb_db.Pool:
|
|
plugin._delete_db_pool(ctx, oper.entity_id)
|
|
else:
|
|
raise r_exc.UnsupportedEntityOperation(
|
|
operation='Remove from DB', entity=oper.lbaas_entity
|
|
)
|
|
|
|
TRANSLATION_DEFAULTS = {'session_persistence_type': 'none',
|
|
'session_persistence_cookie_name': 'none',
|
|
'url_path': '/',
|
|
'http_method': 'GET',
|
|
'expected_codes': '200',
|
|
'subnet': '255.255.255.255',
|
|
'mask': '255.255.255.255',
|
|
'gw': '255.255.255.255',
|
|
}
|
|
VIP_PROPERTIES = ['address', 'protocol_port', 'protocol', 'connection_limit',
|
|
'admin_state_up', 'session_persistence_type',
|
|
'session_persistence_cookie_name']
|
|
POOL_PROPERTIES = ['protocol', 'lb_method', 'admin_state_up']
|
|
MEMBER_PROPERTIES = ['address', 'protocol_port', 'weight', 'admin_state_up',
|
|
'subnet', 'mask', 'gw']
|
|
HEALTH_MONITOR_PROPERTIES = ['type', 'delay', 'timeout', 'max_retries',
|
|
'admin_state_up', 'url_path', 'http_method',
|
|
'expected_codes', 'id']
|
|
|
|
|
|
def _translate_vip_object_graph(extended_vip, plugin, context):
|
|
"""Translate the extended vip
|
|
|
|
translate to a structure that can be
|
|
understood by the workflow.
|
|
|
|
"""
|
|
def _create_key(prefix, property_name):
|
|
return prefix + '_' + property_name + '_array'
|
|
|
|
def _trans_prop_name(prop_name):
|
|
if prop_name == 'id':
|
|
return 'uuid'
|
|
else:
|
|
return prop_name
|
|
|
|
def get_ids(extended_vip):
|
|
ids = {}
|
|
ids['vip'] = extended_vip['id']
|
|
ids['pool'] = extended_vip['pool']['id']
|
|
ids['members'] = [m['id'] for m in extended_vip['members']]
|
|
ids['health_monitors'] = [
|
|
hm['id'] for hm in extended_vip['health_monitors']
|
|
]
|
|
return ids
|
|
|
|
trans_vip = {}
|
|
LOG.debug('Vip graph to be translated: ' + str(extended_vip))
|
|
for vip_property in VIP_PROPERTIES:
|
|
trans_vip['vip_' + vip_property] = extended_vip.get(
|
|
vip_property, TRANSLATION_DEFAULTS.get(vip_property))
|
|
for pool_property in POOL_PROPERTIES:
|
|
trans_vip['pool_' + pool_property] = extended_vip[
|
|
'pool'][pool_property]
|
|
for member_property in MEMBER_PROPERTIES:
|
|
trans_vip[_create_key('member', member_property)] = []
|
|
|
|
two_leg = (extended_vip['pip_address'] != extended_vip['address'])
|
|
if two_leg:
|
|
pool_subnet = plugin._core_plugin.get_subnet(
|
|
context, extended_vip['pool']['subnet_id'])
|
|
|
|
for member in extended_vip['members']:
|
|
if member['status'] != constants.PENDING_DELETE:
|
|
if (two_leg and netaddr.IPAddress(member['address'])
|
|
not in netaddr.IPNetwork(pool_subnet['cidr'])):
|
|
member_ports = plugin._core_plugin.get_ports(
|
|
context,
|
|
filters={'fixed_ips': {'ip_address': [member['address']]},
|
|
'tenant_id': [extended_vip['tenant_id']]})
|
|
if len(member_ports) == 1:
|
|
member_subnet = plugin._core_plugin.get_subnet(
|
|
context,
|
|
member_ports[0]['fixed_ips'][0]['subnet_id'])
|
|
member_network = netaddr.IPNetwork(member_subnet['cidr'])
|
|
member['subnet'] = str(member_network.network)
|
|
member['mask'] = str(member_network.netmask)
|
|
else:
|
|
member['subnet'] = member['address']
|
|
|
|
member['gw'] = pool_subnet['gateway_ip']
|
|
|
|
for member_property in MEMBER_PROPERTIES:
|
|
trans_vip[_create_key('member', member_property)].append(
|
|
member.get(member_property,
|
|
TRANSLATION_DEFAULTS.get(member_property)))
|
|
|
|
for hm_property in HEALTH_MONITOR_PROPERTIES:
|
|
trans_vip[
|
|
_create_key('hm', _trans_prop_name(hm_property))] = []
|
|
for hm in extended_vip['health_monitors']:
|
|
hm_pool = plugin.get_pool_health_monitor(context,
|
|
hm['id'],
|
|
extended_vip['pool']['id'])
|
|
if hm_pool['status'] != constants.PENDING_DELETE:
|
|
for hm_property in HEALTH_MONITOR_PROPERTIES:
|
|
value = hm.get(hm_property,
|
|
TRANSLATION_DEFAULTS.get(hm_property))
|
|
trans_vip[_create_key('hm',
|
|
_trans_prop_name(hm_property))].append(value)
|
|
ids = get_ids(extended_vip)
|
|
trans_vip['__ids__'] = ids
|
|
for key in ['pip_address']:
|
|
if key in extended_vip:
|
|
trans_vip[key] = extended_vip[key]
|
|
LOG.debug('Translated Vip graph: ' + str(trans_vip))
|
|
return trans_vip
|