diff --git a/etc/neutron.conf b/etc/neutron.conf index 5abf669175..2dd42c48c8 100644 --- a/etc/neutron.conf +++ b/etc/neutron.conf @@ -364,3 +364,16 @@ signing_dir = $state_path/keystone-signing # service_provider=FIREWALL:name2:firewall_driver_path # --- Reference implementations --- service_provider=LOADBALANCER:Haproxy:neutron.services.loadbalancer.drivers.haproxy.plugin_driver.HaproxyOnHostPluginDriver:default +# In order to activate Radware's lbaas driver you need to uncomment the next line. +# If you want to keep the HA Proxy as the default lbaas driver, remove the attribute default from the line below. +# Otherwise comment the HA Proxy line +#service_provider = LOADBALANCER:Radware:neutron.services.loadbalancer.drivers.radware.driver.LoadBalancerDriver:default + +[radware] +#vdirect_address=0.0.0.0 +#service_ha_pair=False +#service_throughput=1000 +#service_ssl_throughput=200 +#service_compression_throughput=100 +#service_cache=20 + diff --git a/neutron/services/loadbalancer/drivers/radware/__init__.py b/neutron/services/loadbalancer/drivers/radware/__init__.py new file mode 100644 index 0000000000..253de5544c --- /dev/null +++ b/neutron/services/loadbalancer/drivers/radware/__init__.py @@ -0,0 +1,17 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# +# Copyright 2013 Radware LLC (Radware) +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: Avishay Balderman, Radware diff --git a/neutron/services/loadbalancer/drivers/radware/driver.py b/neutron/services/loadbalancer/drivers/radware/driver.py new file mode 100644 index 0000000000..ca126a20c2 --- /dev/null +++ b/neutron/services/loadbalancer/drivers/radware/driver.py @@ -0,0 +1,833 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# +# Copyright 2013 Radware LTD. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: Avishay Balderman, Radware + +import base64 +import copy +import httplib +import os +import Queue +import socket +from StringIO import StringIO +import threading +import time +from zipfile import ZipFile + +import eventlet +from oslo.config import cfg + +from neutron.common import exceptions as q_exc +from neutron.common import log as call_log +from neutron import context as qcontext +import neutron.db.loadbalancer.loadbalancer_db as lb_db +from neutron.extensions import loadbalancer +from neutron.openstack.common import jsonutils as json +from neutron.openstack.common import log as logging +from neutron.plugins.common import constants +from neutron.services.loadbalancer.drivers import abstract_driver + +eventlet.monkey_patch(thread=True) + +LOG = logging.getLogger(__name__) + +RESP_STATUS = 0 +RESP_REASON = 1 +RESP_STR = 2 +RESP_DATA = 3 + +L2_L3_WORKFLOW_TEMPLATE_NAME = 'openstack_l2_l3' +L4_WORKFLOW_TEMPLATE_NAME = 'openstack_l4' + +ACTIONS_TO_SKIP = ['setup_l2_l3'] + +L4_ACTION_NAME = 'BaseCreate' + +TEMPLATE_HEADER = {'Content-Type': + 'application/vnd.com.radware.vdirect.' + 'template-parameters+json'} +PROVISION_HEADER = {'Content-Type': + 'application/vnd.com.radware.' + 'vdirect.status+json'} +CREATE_SERVICE_HEADER = {'Content-Type': + 'application/vnd.com.radware.' + 'vdirect.adc-service-specification+json'} +ZIP_HEADER = {'Content-Type': 'application/x-zip-compressed'} + +L2_CTOR_PARAMS = {"service": "_REPLACE_", "ha_network_name": "HA-Network", + "ha_ip_pool_name": "default", "allocate_ha_vrrp": True, + "allocate_ha_ips": True} +L2_SETUP_L2_L3_PARAMS = {"data_port": 1, + "data_ip_address": "192.168.200.99", + "data_ip_mask": "255.255.255.0", + "gateway": "192.168.200.1", + "ha_port": 2} + +driver_opts = [ + cfg.StrOpt('vdirect_address', + help=_('vdirect server IP address')), + cfg.BoolOpt('service_ha_pair', + default=False, + help=_('service HA pair')), + cfg.IntOpt('service_throughput', + default=1000, + help=_('service throughtput')), + cfg.IntOpt('service_ssl_throughput', + default=100, + help=_('service ssl throughtput')), + cfg.IntOpt('service_compression_throughput', + default=100, + help=_('service compression throughtput')), + cfg.IntOpt('service_cache', + default=20, + help=_('service cache')) +] + +cfg.CONF.register_opts(driver_opts, "radware") + + +class LoadBalancerDriver(abstract_driver.LoadBalancerAbstractDriver): + + """Radware lbaas driver.""" + + def __init__(self, plugin): + rad = cfg.CONF.radware + self.plugin = plugin + self.service = { + "haPair": rad.service_ha_pair, + "primary": { + "capacity": { + "throughput": rad.service_throughput, + "sslThroughput": rad.service_ssl_throughput, + "compressionThroughput": + rad.service_compression_throughput, + "cache": rad.service_cache + }, + "network": { + "type": "portgroup", + "portgroups": ['DATA_NETWORK'] + }, + "adcType": "VA", + "acceptableAdc": "Exact" + } + } + vdirect_address = cfg.CONF.radware.vdirect_address + self.rest_client = vDirectRESTClient(server=vdirect_address) + self.queue = Queue.Queue() + self.completion_handler = OperationCompletionHander(self.queue, + self.rest_client, + plugin) + self.workflows_were_uploaded = False + self.completion_handler.setDaemon(True) + self.completion_handler.start() + + def create_vip(self, context, vip): + LOG.debug(_('create_vip. vip: %s'), str(vip)) + extended_vip = self.plugin.populate_vip_graph(context, vip) + LOG.debug(_('create_vip. extended_vip: %s'), str(extended_vip)) + network_id = self._get_vip_network_id(context, extended_vip) + LOG.debug(_('create_vip. network_id: %s '), str(network_id)) + service_name = self._get_service(extended_vip['pool_id'], network_id) + LOG.debug(_('create_vip. service_name: %s '), service_name) + self._create_workflow( + vip['pool_id'], L4_WORKFLOW_TEMPLATE_NAME, + {"service": service_name}) + self._update_workflow( + vip['pool_id'], + L4_ACTION_NAME, extended_vip) + + def update_vip(self, context, old_vip, vip): + extended_vip = self.plugin.populate_vip_graph(context, vip) + self._update_workflow( + vip['pool_id'], L4_ACTION_NAME, + extended_vip, False, lb_db.Vip, vip['id']) + + def delete_vip(self, context, vip): + """Delete a Vip + + First delete it from the device. If deletion ended OK + - remove data from DB as well. + If the deletion failed - mark elements with error status in DB + + """ + + extended_vip = self.plugin.populate_vip_graph(context, vip) + try: + # removing the WF will cause deletion of the configuration from the + # device + self._remove_workflow(extended_vip, context) + except Exception: + pool_id = extended_vip['pool_id'] + LOG.exception(_('Failed to remove workflow %s'), pool_id) + _update_vip_graph_status( + self.plugin, context, extended_vip, constants.ERROR + ) + + def create_pool(self, context, pool): + # nothing to do + pass + + def update_pool(self, context, old_pool, pool): + self._handle_pool(context, pool) + + def delete_pool(self, context, pool,): + self._handle_pool(context, pool, delete=True) + + def _handle_pool(self, context, pool, delete=False): + vip_id = self.plugin.get_pool(context, pool['id']).get('vip_id', None) + if vip_id: + if delete: + raise loadbalancer.PoolInUse(pool_id=pool['id']) + else: + vip = self.plugin.get_vip(context, vip_id) + extended_vip = self.plugin.populate_vip_graph(context, vip) + self._update_workflow( + pool['id'], L4_ACTION_NAME, + extended_vip, delete, lb_db.Pool, pool['id']) + else: + if delete: + self.plugin._delete_db_pool(context, pool['id']) + else: + # we keep the pool in PENDING_UPDATE + # no point to modify it since it is not connected to vip yet + pass + + def create_member(self, context, member): + self._handle_member(context, member) + + def update_member(self, context, old_member, member): + self._handle_member(context, member) + + def delete_member(self, context, member): + self._handle_member(context, member, delete=True) + + def _handle_member(self, context, member, delete=False): + """Navigate the model. If a Vip is found - activate a bulk WF action. + """ + vip_id = self.plugin.get_pool( + context, member['pool_id']).get('vip_id') + if vip_id: + vip = self.plugin.get_vip(context, vip_id) + extended_vip = self.plugin.populate_vip_graph(context, vip) + self._update_workflow( + member['pool_id'], L4_ACTION_NAME, + extended_vip, delete, lb_db.Member, member['id']) + # We have to delete this member but it is not connected to a vip yet + elif delete: + self.plugin._delete_db_member(context, member['id']) + + def create_health_monitor(self, context, health_monitor): + # Anything to do here? the hm is not connected to the graph yet + pass + + def update_health_monitor(self, context, old_health_monitor, + health_monitor, + pool_id): + self._handle_pool_health_monitor(context, health_monitor, pool_id) + + def create_pool_health_monitor(self, context, + health_monitor, pool_id): + self._handle_pool_health_monitor(context, health_monitor, pool_id) + + def delete_pool_health_monitor(self, context, health_monitor, pool_id): + self._handle_pool_health_monitor(context, health_monitor, pool_id, + True) + + def _handle_pool_health_monitor(self, context, health_monitor, + pool_id, delete=False): + """Push a graph to vDirect + + Navigate the model. Check if a pool is associated to the vip + and push the graph to vDirect + + """ + + vip_id = self.plugin.get_pool(context, pool_id).get('vip_id', None) + + debug_params = {"hm_id": health_monitor['id'], "pool_id": pool_id, + "delete": delete, "vip_id": vip_id} + LOG.debug(_('_handle_pool_health_monitor. health_monitor = %(hm_id)s ' + 'pool_id = %(pool_id)s delete = %(delete)s ' + 'vip_id = %(vip_id)s'), + debug_params) + + if vip_id: + vip = self.plugin.get_vip(context, vip_id) + extended_vip = self.plugin.populate_vip_graph(context, vip) + self._update_workflow(pool_id, L4_ACTION_NAME, + extended_vip, + delete, lb_db.PoolMonitorAssociation, + health_monitor['id']) + elif delete: + self.plugin._delete_db_pool_health_monitor(context, + health_monitor['id'], + pool_id) + + def stats(self, context, pool_id): + # TODO(avishayb) implement + return {"bytes_in": 0, + "bytes_out": 0, + "active_connections": 0, + "total_connections": 0} + + def _get_vip_network_id(self, context, extended_vip): + subnet = self.plugin._core_plugin.get_subnet( + context, extended_vip['subnet_id']) + return subnet['network_id'] + + @call_log.log + def _update_workflow(self, wf_name, action, wf_params, delete=False, + lbaas_entity=None, entity_id=None): + """Update the WF state. Push the result to a queue for processing.""" + + if not self.workflows_were_uploaded: + self._upload_workflows_templates() + + if action not in ACTIONS_TO_SKIP: + params = _translate_vip_object_graph(wf_params) + else: + params = wf_params + + resource = '/api/workflow/%s/action/%s' % (wf_name, action) + response = _rest_wrapper(self.rest_client.call('POST', resource, + {'parameters': params}, + TEMPLATE_HEADER)) + LOG.debug(_('_update_workflow response: %s '), response) + + if action not in ACTIONS_TO_SKIP: + ids = params.pop('__ids__', None) + if not ids: + raise q_exc.NeutronException( + _('params must contain __ids__ field!') + ) + + oper = OperationAttributes(response['uri'], + ids, + lbaas_entity, + entity_id, + delete=delete) + LOG.debug(_('Pushing operation %s to the queue'), oper) + self.queue.put_nowait(oper) + + def _remove_workflow(self, wf_params, context): + params = _translate_vip_object_graph(wf_params) + ids = params.pop('__ids__', None) + if not ids: + raise q_exc.NeutronException( + _('params must contain __ids__ field!') + ) + + wf_name = ids['pool'] + LOG.debug(_('Remove the workflow %s') % wf_name) + resource = '/api/workflow/%s' % (wf_name) + response = _rest_wrapper(self.rest_client.call('DELETE', resource, + None, None), + [204, 202, 404]) + msg = response.get('message', None) + if msg == "Not Found": + self.plugin._delete_db_vip(context, ids['vip']) + else: + oper = OperationAttributes(response['uri'], + ids, + lb_db.Vip, + ids['vip'], + delete=True) + LOG.debug(_('Pushing operation %s to the queue'), oper) + self.queue.put_nowait(oper) + + def _remove_service(self, service_name): + resource = '/api/service/%s' % (service_name) + _rest_wrapper(self.rest_client.call('DELETE', + resource, None, None), + [202]) + + def _get_service(self, pool_id, network_id): + """Get a service name. + + if you cant find one, + create a service and create l2_l2 WF. + + """ + incoming_service_name = 'srv_' + network_id + service_name = self._get_available_service(incoming_service_name) + if not service_name: + LOG.debug( + 'Could not find a service named ' + incoming_service_name) + service_name = self._create_service(pool_id, network_id) + L2_CTOR_PARAMS["service"] = incoming_service_name + wf_name = 'l2_l3_' + network_id + if not self.workflows_were_uploaded: + self._upload_workflows_templates() + self._create_workflow( + wf_name, L2_L3_WORKFLOW_TEMPLATE_NAME, L2_CTOR_PARAMS) + self._update_workflow( + wf_name, "setup_l2_l3", L2_SETUP_L2_L3_PARAMS) + else: + LOG.debug('A service named ' + service_name + ' was found.') + return service_name + + def _create_service(self, pool_id, network_id): + """create the service and provision it (async).""" + # 1) create the service + service_name = 'srv_' + network_id + resource = '/api/service?name=%s' % service_name + + service = copy.deepcopy(self.service) + service['primary']['network']['portgroups'] = [network_id] + + response = _rest_wrapper(self.rest_client.call('POST', resource, + service, + CREATE_SERVICE_HEADER), [201]) + + # 2) provision the service + provision_uri = response['links']['actions']['provision'] + _rest_wrapper(self.rest_client.call('POST', provision_uri, + None, PROVISION_HEADER)) + return service_name + + def _get_available_service(self, service_name): + """Check if service exsists and return its name if it does.""" + resource = '/api/service/' + service_name + try: + _rest_wrapper(self.rest_client.call('GET', + resource, + None, None), [200]) + except Exception: + return + return service_name + + def _workflow_exists(self, pool_id): + """Check if a WF having the name of the pool_id exists.""" + resource = '/api/workflow/' + pool_id + try: + _rest_wrapper(self.rest_client.call('GET', + resource, + None, + None), [200]) + except Exception: + return False + return True + + def _create_workflow(self, wf_name, wf_template_name, + create_workflow_params=None): + """Create a WF if it doesnt exists yet.""" + if not self.workflows_were_uploaded: + self._upload_workflows_templates() + if not self._workflow_exists(wf_name): + if not create_workflow_params: + create_workflow_params = {} + resource = '/api/workflowTemplate/%s?name=%s' % ( + wf_template_name, wf_name) + params = {'parameters': create_workflow_params} + response = _rest_wrapper(self.rest_client.call('POST', + resource, + params, + TEMPLATE_HEADER)) + LOG.debug(_('create_workflow response: %s'), str(response)) + + def _upload_workflows_templates(self): + """Upload the driver workflows to vDirect server.""" + workflows = {L2_L3_WORKFLOW_TEMPLATE_NAME: + False, L4_WORKFLOW_TEMPLATE_NAME: False} + resource = '/api/workflowTemplate' + response = _rest_wrapper(self.rest_client.call('GET', + resource, + None, + None), [200]) + for wf in workflows.keys(): + for wf_template in response: + if wf == wf_template['name']: + workflows[wf] = True + break + for wf, found in workflows.items(): + if not found: + self._upload_workflow_template(wf) + self.workflows_were_uploaded = True + + def _upload_workflow_template(self, wf_template_name): + """Upload a wf template to vDirect server.""" + def _get_folders(): + current_folder = os.path.dirname(os.path.realpath(__file__)) + folders = [current_folder + '/workflows/' + wf_template_name, + current_folder + '/workflows/common'] + return folders + + LOG.debug(_('About to upload wf template named %s.zip'), + wf_template_name) + data = self._get_workflow_zip_data(_get_folders()) + _rest_wrapper(self.rest_client.call('POST', + '/api/workflowTemplate', + data, + ZIP_HEADER, binary=True), [201]) + + def _get_workflow_zip_data(self, folders): + """Create a zip file on the fly and return its content.""" + def _file_to_zip(f): + n, ext = os.path.splitext(f) + LOG.debug("file name = " + n + " ext = " + ext) + return f == 'workflow.xml' or ext in ['.vm', '.groovy'] + in_memory_file = StringIO() + zip_file = ZipFile(in_memory_file, 'w') + LOG.debug(_('Folders are %s'), folders) + for folder in folders: + LOG.debug(_('Folder is %s'), folder) + for root, dirs, files in os.walk(folder): + for file in files: + if _file_to_zip(file): + LOG.debug(_('About to add file %s to zip'), str(file)) + LOG.debug(_('Path: %s'), os.path.join(root, file)) + zip_file.write(os.path.join(root, file), + os.path.basename(file)) + LOG.debug(_('File %s was added to zip'), str(file)) + zip_file.close() + return in_memory_file.getvalue() + + +class vDirectRESTClient: + """REST server proxy to Radware vDirect.""" + + def __init__(self, + server='localhost', + port=2188, + ssl=None, + auth=None, + timeout=5000, + base_uri=''): + self.server = server + self.port = port + self.ssl = ssl + self.base_uri = base_uri + self.timeout = timeout + self.auth = None + if auth: + self.auth = 'Basic ' + base64.encodestring(auth).strip() + debug_params = {'server': self.server, + 'port': self.port, + 'ssl': self.ssl} + LOG.debug(_('vDirectRESTClient:init server=%(server)s, ' + 'port=%(port)d, ' + 'ssl=%(ssl)r'), debug_params) + + @call_log.log + def call(self, action, resource, data, headers, binary=False): + if resource.startswith('http'): + uri = resource + else: + uri = self.base_uri + resource + if binary: + body = data + else: + body = json.dumps(data) + + debug_data = 'binary' if binary else body + debug_data = debug_data if debug_data else 'EMPTY' + if not headers: + headers = {} + + conn = None + if self.ssl: + conn = httplib.HTTPSConnection( + self.server, self.port, timeout=self.timeout) + if conn is None: + LOG.error(_('vdirectRESTClient: Could not establish HTTPS ' + 'connection')) + return 0, None, None, None + else: + conn = httplib.HTTPConnection( + self.server, self.port, timeout=self.timeout) + if conn is None: + LOG.error(_('vdirectRESTClient: Could not establish HTTP ' + 'connection')) + return 0, None, None, None + + try: + conn.request(action, uri, body, headers) + response = conn.getresponse() + respstr = response.read() + respdata = respstr + try: + respdata = json.loads(respstr) + except ValueError: + # response was not JSON, ignore the exception + pass + ret = (response.status, response.reason, respstr, respdata) + except (socket.timeout, socket.error) as e: + log_dict = {'action': action, 'e': e} + LOG.error(_('vdirectRESTClient: %(action)s failure, %(e)r'), + log_dict) + ret = 0, None, None, None + conn.close() + return ret + + +class OperationAttributes: + + """Holds operation attributes.""" + + def __init__(self, + operation_url, + object_graph, + lbaas_entity=None, + entity_id=None, + delete=False): + self.operation_url = operation_url + self.object_graph = object_graph + self.delete = delete + self.lbaas_entity = lbaas_entity + self.entity_id = entity_id + self.creation_time = time.time() + + def __repr__(self): + items = ("%s = %r" % (k, v) for k, v in self.__dict__.items()) + return "<%s: {%s}>" % (self.__class__.__name__, ', '.join(items)) + + +class OperationCompletionHander(threading.Thread): + + """Update DB with operation status or delete the entity from DB.""" + + def __init__(self, queue, rest_client, plugin): + threading.Thread.__init__(self) + self.queue = queue + self.rest_client = rest_client + self.admin_ctx = qcontext.get_admin_context() + self.plugin = plugin + self.stoprequest = threading.Event() + + def _get_db_status(self, operation, success, messages=None): + """Get the db_status based on the status of the vdirect operation.""" + if not success: + # we have a failure - log it and set the return ERROR as DB state + msg = ', '.join(messages) if messages else "unknown" + error_params = {"operation": operation, "msg": msg} + LOG.error(_('Operation %(operation)s failed. Reason: %(msg)s'), + error_params) + return constants.ERROR + if operation.delete: + return None + else: + return constants.ACTIVE + + def join(self, timeout=None): + self.stoprequest.set() + super(OperationCompletionHander, self).join(timeout) + + def run(self): + oper = None + while not self.stoprequest.isSet(): + try: + oper = self.queue.get(timeout=1) + LOG.debug('Operation consumed from the queue: ' + + str(oper)) + # check the status - if oper is done: update the db , + # else push the oper again to the queue + result = self.rest_client.call('GET', + oper.operation_url, + None, + None) + completed = result[RESP_DATA]['complete'] + if completed: + # operation is done - update the DB with the status + # or delete the entire graph from DB + success = result[RESP_DATA]['success'] + sec_to_completion = time.time() - oper.creation_time + debug_data = {'oper': oper, + 'sec_to_completion': sec_to_completion, + 'success': success} + LOG.debug(_('Operation %(oper)s is completed after ' + '%(sec_to_completion)d sec ' + 'with success status: %(success)s :'), + debug_data) + db_status = self._get_db_status(oper, success) + if db_status: + _update_vip_graph_status( + self.plugin, self.admin_ctx, + oper, db_status) + else: + _remove_object_from_db( + self.plugin, self.admin_ctx, oper) + else: + # not completed - push to the queue again + LOG.debug(_('Operation %s is not completed yet..') % oper) + # queue is empty - lets take a short rest + if self.queue.empty(): + time.sleep(1) + self.queue.put_nowait(oper) + # send a signal to the queue that the job is done + self.queue.task_done() + except Queue.Empty: + continue + except Exception: + m = _("Exception was thrown inside OperationCompletionHander") + LOG.exception(m) + + +def _rest_wrapper(response, success_codes=[202]): + """Wrap a REST call and make sure a valid status is returned.""" + if response[RESP_STATUS] not in success_codes: + raise q_exc.NeutronException(str(response[RESP_STATUS]) + ':' + + response[RESP_REASON] + + '. Error description: ' + + response[RESP_STR]) + else: + return response[RESP_DATA] + + +def _update_vip_graph_status(plugin, context, oper, status): + """Update the status + + Of all the Vip object graph + or a specific entity in the graph. + + """ + + LOG.debug(_('_update: %s '), oper) + if oper.lbaas_entity == lb_db.PoolMonitorAssociation: + plugin.update_pool_health_monitor(context, + oper.entity_id, + oper.object_graph['pool'], + status) + elif oper.entity_id: + plugin.update_status(context, + oper.lbaas_entity, + oper.entity_id, + status) + else: + # update the whole vip graph status + plugin.update_status(context, + lb_db.Vip, + oper.object_graph['vip'], + status) + plugin.update_status(context, + lb_db.Pool, + oper.object_graph['pool'], + status) + for member_id in oper.object_graph['members']: + plugin.update_status(context, + lb_db.Member, + member_id, + status) + for hm_id in oper.object_graph['health_monitors']: + plugin.update_pool_health_monitor(context, + hm_id, + oper.object_graph['pool'], + status) + + +def _remove_object_from_db(plugin, context, oper): + """Remove a specific entity from db.""" + LOG.debug(_('_remove_object_from_db %s'), str(oper)) + if oper.lbaas_entity == lb_db.PoolMonitorAssociation: + plugin._delete_db_pool_health_monitor(context, + oper.entity_id, + oper.object_graph['pool_id']) + elif oper.lbaas_entity == lb_db.Member: + plugin._delete_db_member(context, oper.entity_id) + elif oper.lbaas_entity == lb_db.Vip: + plugin._delete_db_vip(context, oper.entity_id) + elif oper.lbaas_entity == lb_db.Pool: + plugin._delete_db_pool(context, oper.entity_id) + else: + raise q_exc.NeutronException( + _('Tried to remove unsupported lbaas entity %s!'), + str(oper.lbaas_entity) + ) + +TRANSLATION_DEFAULTS = {'session_persistence_type': 'SOURCE_IP', + 'session_persistence_cookie_name': 'none', + 'url_path': '/', + 'http_method': 'GET', + 'expected_codes': '200' + } +VIP_PROPERTIES = ['address', 'protocol_port', 'protocol', 'connection_limit', + 'admin_state_up', 'session_persistence_type', + 'session_persistence_cookie_name'] +POOL_PROPERTIES = ['protocol', 'lb_method', 'admin_state_up'] +MEMBER_PROPERTIES = ['address', 'protocol_port', 'weight', 'admin_state_up'] +HEALTH_MONITOR_PROPERTIES = ['type', 'delay', 'timeout', 'max_retries', + 'admin_state_up', 'url_path', 'http_method', + 'expected_codes', 'id'] + + +def _translate_vip_object_graph(extended_vip): + """Translate the extended vip + + translate to a structure that can be + understood by the workflow. + + """ + def _create_key(prefix, property_name): + return prefix + '_' + property_name + '_array' + + def _trans_prop_name(prop_name): + if prop_name == 'id': + return 'uuid' + else: + return prop_name + + def get_ids(extended_vip): + ids = {} + ids['vip'] = extended_vip['id'] + ids['pool'] = extended_vip['pool']['id'] + ids['members'] = [m['id'] for m in extended_vip['members']] + ids['health_monitors'] = [ + hm['id'] for hm in extended_vip['health_monitors'] + ] + return ids + + trans_vip = {} + LOG.debug('Vip graph to be translated: ' + str(extended_vip)) + for vip_property in VIP_PROPERTIES: + trans_vip['vip_' + vip_property] = extended_vip.get( + vip_property, TRANSLATION_DEFAULTS.get(vip_property)) + for pool_property in POOL_PROPERTIES: + trans_vip['pool_' + pool_property] = extended_vip[ + 'pool'][pool_property] + for member_property in MEMBER_PROPERTIES: + trans_vip[_create_key('member', member_property)] = [] + for member in extended_vip['members']: + for member_property in MEMBER_PROPERTIES: + trans_vip[_create_key('member', member_property)].append( + member.get(member_property, + TRANSLATION_DEFAULTS.get(member_property))) + for hm_property in HEALTH_MONITOR_PROPERTIES: + trans_vip[ + _create_key('hm', _trans_prop_name(hm_property))] = [] + for hm in extended_vip['health_monitors']: + for hm_property in HEALTH_MONITOR_PROPERTIES: + value = hm.get(hm_property, + TRANSLATION_DEFAULTS.get(hm_property)) + trans_vip[_create_key('hm', + _trans_prop_name(hm_property))].append(value) + ids = get_ids(extended_vip) + trans_vip['__ids__'] = ids + LOG.debug('Translated Vip graph: ' + str(trans_vip)) + return trans_vip + + +def _drop_pending_delete_elements(extended_vip): + """Traverse the Vip object graph and drop PENDEING_DELETE nodes.""" + # What if the pool is pendening_delete? + extended_vip['health_monitors'] = [ + hm for hm in extended_vip['health_monitors'] + if hm['status'] != constants.PENDING_DELETE + ] + extended_vip['members'] = [ + member for member in extended_vip['members'] + if member['status'] != constants.PENDING_DELETE + ] + + return extended_vip diff --git a/neutron/services/loadbalancer/drivers/radware/workflows/common/groovy/wait_for_service.groovy b/neutron/services/loadbalancer/drivers/radware/workflows/common/groovy/wait_for_service.groovy new file mode 100644 index 0000000000..c5f6414e43 --- /dev/null +++ b/neutron/services/loadbalancer/drivers/radware/workflows/common/groovy/wait_for_service.groovy @@ -0,0 +1,51 @@ +import com.radware.alteon.beans.adc.*; +import com.radware.alteon.api.*; +import com.radware.alteon.sdk.* +import com.radware.alteon.sdk.rpm.* +import com.radware.alteon.api.impl.AlteonCliUtils; +import com.radware.alteon.cli.CliSession; + + + +service.provision() + +// +// temp patch until provision will make sure SSH is active +// sleep up to 5 min + +counter = 0 +logger.info("Start waiting for SSH connection.") +COUNTER_MAX = 300 +SLEEP_TIME = 2000 + +while (counter < COUNTER_MAX) { + try { + validateAdcCLIConnection(service.getPrimary()); + logger.info("Validated primary (" + counter + ")") + if (service.request.ha) { + validateAdcCLIConnection(service.getSecondary()); + logger.info("Validated secondary (" + counter + ")") + } + break + } catch (Exception e) { + counter++ + sleep(SLEEP_TIME) + } +} + +if(counter >= COUNTER_MAX) { + throw new Exception("Could not validate SSH connection after " + (COUNTER_MAX * SLEEP_TIME) / 1000 + " seconds.") +} + +logger.info("Validated SSH connection..") + +def validateAdcCLIConnection(AdcCLIConnection connection) { + CliSession s = new CliSession(AlteonCliUtils.convertConnection(connection)); + try { + s.connect(); + s.close(); + } catch (Exception e) { + throw new AdcConnectionException("IOException while validating the connection. Please check the connection settings.",e); + } +} + diff --git a/neutron/services/loadbalancer/drivers/radware/workflows/openstack_l2_l3/groovy/read_ips_data_from_service.groovy b/neutron/services/loadbalancer/drivers/radware/workflows/openstack_l2_l3/groovy/read_ips_data_from_service.groovy new file mode 100644 index 0000000000..8a6bd7d6e3 --- /dev/null +++ b/neutron/services/loadbalancer/drivers/radware/workflows/openstack_l2_l3/groovy/read_ips_data_from_service.groovy @@ -0,0 +1,168 @@ +import groovy.transform.ToString +import groovy.transform.EqualsAndHashCode + +import com.radware.alteon.beans.adc.*; +import com.radware.alteon.api.*; +import com.radware.alteon.sdk.* +import com.radware.alteon.sdk.rpm.* +import com.radware.alteon.api.impl.AlteonCliUtils; +import com.radware.alteon.cli.CliSession; + + +@ToString(includeNames=true) +@EqualsAndHashCode(excludes=["gateway","mask","ips"]) +class SubnetInfo { + String id + String gateway + String mask + String ips +} + +@ToString(includeNames=true) +@EqualsAndHashCode(excludes=["subnets"]) +class PortInfo { + String name + def subnets = [:] +} + + +def tokenize_key(map_key) { + def ret_arr = map_key.tokenize(".") + if (ret_arr.size > 0 && ret_arr[0].startsWith("port")) { + return ret_arr + } + else + return null; +} + + +def parse(advanced_props) { + def ports = [:] + advanced_props.each { + key, value -> + def parsed_key = tokenize_key(key) + if (parsed_key) { + def port_name = parsed_key[0] + def subnet_id = parsed_key[1] + def property = parsed_key[2] + def port_info = ports.get(port_name) + if (port_info) { + def subnet_info = port_info.subnets.get(subnet_id) + if (subnet_info) { + subnet_info[property] = value + } + else { + subnet_info = new SubnetInfo(id:subnet_id) + subnet_info[property] = value + port_info.subnets.put(subnet_id, subnet_info) + } + } + else { + port_info = new PortInfo(name:port_name) + subnet_info = new SubnetInfo(id:subnet_id) + subnet_info[property] = value + port_info.subnets.put(subnet_id, subnet_info) + ports.put(port_name, port_info) + } + } + } + return ports +} + +def get_property_per_port (ports, port_name, property_name) { + port_info = ports[port_name] + if (port_info) { + port_subnet = port_info.subnets + if (port_subnet && !port_subnet.isEmpty()) { + port_subnet_item = port_subnet.values().iterator().next() + port_subnet_property = port_subnet_item[property_name] + if (port_subnet_property) { + val_array = port_subnet_property.tokenize(",") + if (!val_array.isEmpty()) + return val_array[0] + } + } + } + else { + return null + } +} + +def cidr_to_mask(cidr) throws NumberFormatException { + + String[] st = cidr.split("\\/"); + if (st.length != 2) { + throw new NumberFormatException("Invalid CIDR format '" + + cidr + "', should be: xx.xx.xx.xx/xx"); + } + String symbolicIP = st[0]; + String symbolicCIDR = st[1]; + + Integer numericCIDR = new Integer(symbolicCIDR); + if (numericCIDR > 32) { + throw new NumberFormatException("CIDR can not be greater than 32"); + } + //Get IP + st = symbolicIP.split("\\."); + if (st.length != 4) { + throw new NumberFormatException("Invalid IP address: " + symbolicIP); + } + int i = 24; + baseIPnumeric = 0; + for (int n = 0; n < st.length; n++) { + int value = Integer.parseInt(st[n]); + if (value != (value & 0xff)) { + throw new NumberFormatException("Invalid IP address: " + symbolicIP); + } + baseIPnumeric += value << i; + i -= 8; + } + //Get netmask + if (numericCIDR < 1) + throw new NumberFormatException("Netmask CIDR can not be less than 1"); + netmaskNumeric = 0xffffffff; + netmaskNumeric = netmaskNumeric << (32 - numericCIDR); + return netmaskNumeric +} + + +def String convert_numeric_ip_to_symbolic(ip) { + StringBuffer sb = new StringBuffer(15); + for (int shift = 24; shift > 0; shift -= 8) { + // process 3 bytes, from high order byte down. + def tmp = (ip >>> shift) & 0xff + sb.append(tmp) + sb.append('.'); + } + sb.append(ip & 0xff); + return sb.toString(); +} + + +primary_adc = sdk.read(service.getPrimaryId()) +primary_config = primary_adc.adcInfo.advancedConfiguration +primary_ports = parse(primary_config) +data_ip_address = get_property_per_port(primary_ports, "port1", "ips") +data_ip_mask = convert_numeric_ip_to_symbolic(cidr_to_mask(get_property_per_port(primary_ports, "port1", "mask"))) +gateway = get_property_per_port(primary_ports, "port1", "gateway") + +if (service.request.ha) { + secondary_adc = sdk.read(service.getSecondaryId()) + secondary_config = secondary_adc.adcInfo.advancedConfiguration + secondary_ports = parse(secondary_config) + ha_ip_address_1 = get_property_per_port(primary_ports, "port2", "ips") + ha_ip_address_2 = get_property_per_port(secondary_ports, "port2", "ips") + ha_vrrp_ip_address = ha_ip_address_1 + ha_ip_mask = convert_numeric_ip_to_symbolic(cidr_to_mask(get_property_per_port(primary_ports, "port2", "mask"))) +} +else { + secondary_adc = null + secondary_config = null + secondary_ports = null + ha_ip_address_1 = "1.1.1.1" + ha_ip_address_2 = "1.1.1.2" + ha_vrrp_ip_address = "1.1.1.3" + ha_ip_mask = "255.255.255.255" + ha_group_vr_id = 2 +} + diff --git a/neutron/services/loadbalancer/drivers/radware/workflows/openstack_l2_l3/templates/setup_l2_l3.vm b/neutron/services/loadbalancer/drivers/radware/workflows/openstack_l2_l3/templates/setup_l2_l3.vm new file mode 100644 index 0000000000..eee520dfb1 --- /dev/null +++ b/neutron/services/loadbalancer/drivers/radware/workflows/openstack_l2_l3/templates/setup_l2_l3.vm @@ -0,0 +1,131 @@ + +#property('description', 'Configures VLANs and L3 interface for data and HA networks - v1') + +#param("data_port", "int", "in", "min=1", "max=2", "prompt=Data Port") +#param("data_ip_address", "ip", "in", "prompt=Data IP Address") +#param("data_ip_mask", "ip", "in", "prompt=Data IP Mask") +#param("gateway", "ip", "in", "prompt=Default Gateway IP Address") + +#param("ha_enabled", "bool", "in", "prompt=HA Enabled?") +#param("ha_port", "int", "in", "min=1", "max=2", "prompt=HA Port") +#param("ha_ip_address", "ip", "in", "prompt=HA IP Address") +#param("ha_ip_mask", "ip", "in", "prompt=HA IP Mask") +#param("ha_vrrp_ip_address", "ip", "in", "prompt=HA VRRP IP Address") +#param("ha_group_vr_id", "int", "in", "min=2", "max=1024", "prompt=HA Group VR ID (1 is allocated to the interface VR)") + +#param("data_interface_id", "int", "out") +#param("gateway_id", "int", "out") +#param("ha_interface_id", "int", "out") +#param("data_vlan", "int", "out") +#param("ha_vlan", "int", "out") + +#if($data_port == $ha_port) + #error("Data Port and HA Port must be on different Ports!!") +#end + +#set($port = $adc.newBean('AgPortNewCfgTableEntry')) +#set($port.Indx = $data_port) +#set($port = $adc.read($port)) +#if ($adc.isNull($port)) + ##Port was not found. not too realistic but if so raise an error + #error("Port $data_port was not found!!") +#else + #set($data_vlan = $port.PVID) +#end + +#set($port = $adc.newBean('AgPortNewCfgTableEntry')) +#set($port.Indx = $ha_port) +#set($port = $adc.read($port)) +#if ($adc.isNull($port)) + ##Port was not found. not too realistic but if so raise an error + #error("Port $ha_port was not found!!") +#else + #set($ha_vlan = $port.PVID) +#end + +#set($Integer = 0) + +#set($data_interface_string = "#get_interface_id($data_ip_address, 1)") +#set($data_interface_id = $Integer.parseInt($data_interface_string.trim())) +#create_interface($data_ip_address, $data_ip_mask, $data_vlan, $data_interface_id) + +#set($gwb = $adc.newBean('/c/l3/gw')) +#set($gwb.addr = $gateway) +#set($gwb = $adc.findFirst($gwb)) +#if ($adc.isNull($gwb)) + #set($gateway_id = $adc.getFreeIndexWithDefault('/c/l3/gw', 1)) +#else + #error("Gateway with address $gateway already exists on index $gwb.index") +#end + +#if ($gateway_id < 5) +/c/l3/gw $gateway_id + addr $gateway + arp ena + ena +#else + #log('error', "The available gateway index $gatewayId cannot be used for a default gateway!") + #error("No available index for a default gateway!") +#end + +#if($ha_enabled) + #set($ha_interface_string = "#get_interface_id($ha_ip_address, $data_interface_id)") + #set($ha_interface_id = $Integer.parseInt($ha_interface_string.trim())) + #create_interface($ha_ip_address, $ha_ip_mask, $ha_vlan, $ha_interface_id) + + /c/l3/vrrp/on + /c/l3/vrrp/hotstan enabled + + /c/l3/vrrp/vr 1 + ena + ipver v4 + vrid 1 + if $ha_interface_id + addr $ha_vrrp_ip_address + share dis + + /c/l3/vrrp/group + ena + ipver v4 + vrid $ha_group_vr_id + if $ha_interface_id + share dis + + /c/slb/port $data_port + hotstan ena + + /c/slb/port $ha_port + intersw ena +#else + #set($ha_interface_id = 0) +#end + +/c/slb + on + +/c/slb/port $data_port + client ena + server ena + proxy ena + +#macro(get_interface_id, $address, $default_index) + #set($interface = $adc.newBean('/c/l3/if')) + #set($interface.addr = $address) + #set($interface = $adc.findFirst($interface)) + #if ($adc.isNull($interface)) + ## IP address not found + #set($interface_id = $adc.getFreeIndexWithDefault('/c/l3/if', $default_index)) + $interface_id + #else + ## Found existing interface with this address + #error("Found existing interface with address $address on index $interface.index!!") + #end +#end + +#macro(create_interface, $address, $mask, $vlan, $interface_id) + /c/l3/if $interface_id + addr $address + mask $mask + vlan $vlan + ena +#end diff --git a/neutron/services/loadbalancer/drivers/radware/workflows/openstack_l2_l3/templates/teardown_l2_l3.vm b/neutron/services/loadbalancer/drivers/radware/workflows/openstack_l2_l3/templates/teardown_l2_l3.vm new file mode 100644 index 0000000000..8cbcf607cc --- /dev/null +++ b/neutron/services/loadbalancer/drivers/radware/workflows/openstack_l2_l3/templates/teardown_l2_l3.vm @@ -0,0 +1,45 @@ + +#property('description', 'Cleanup VLANs and L3 interface for data and HA networks - v1') + +#param("data_port", "int", "in", "min=1", "max=2", "prompt=Data Port") +#param("data_interface_id", "int", "in", "min=1", "max=256", "prompt=Data Interface ID") +#param("gateway_id", "int", "in", "min=1", "max=4", "prompt=Default Gateway ID") +#param("ha_enabled", "bool", "in", "prompt=HA Enabled?") +#param("ha_port", "int", "in", "min=1", "max=2", "prompt=HA Port") +#param("ha_interface_id", "int", "in", "min=1", "max=256", "prompt=HA Interface ID") + + +#if($ha_enabled) + /c/slb/port $data_port + hotstan dis + + /c/slb/port $ha_port + intersw dis + + /c/l3/vrrp/group + del + + /c/l3/vrrp/vr 1 + del + + /c/l3/vrrp/hotstan dis + + /c/l3/vrrp/off + + #delete_interface($ha_interface_id) + +#end + +/c/slb + off + +/c/l3/gw $gateway_id + del + +#delete_interface($data_interface_id) + + +#macro(delete_interface, $interface_id) +/c/l3/if $interface_id + del +#end diff --git a/neutron/services/loadbalancer/drivers/radware/workflows/openstack_l2_l3/workflow/workflow.xml b/neutron/services/loadbalancer/drivers/radware/workflows/openstack_l2_l3/workflow/workflow.xml new file mode 100644 index 0000000000..48c0c40df4 --- /dev/null +++ b/neutron/services/loadbalancer/drivers/radware/workflows/openstack_l2_l3/workflow/workflow.xml @@ -0,0 +1,166 @@ + + + Workflow to setup L2 and L3 for Alteon VA, Single or HA Pair, in Hot Standbye [2013-07-25 11:50:20.285000] + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +