# vim: tabstop=4 shiftwidth=4 softtabstop=4 # # Copyright 2013 Radware LTD. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. # # @author: Avishay Balderman, Radware import base64 import copy import httplib import os import Queue import socket from StringIO import StringIO import threading import time from zipfile import ZipFile import eventlet from oslo.config import cfg from neutron.common import exceptions as q_exc from neutron.common import log as call_log from neutron import context as qcontext import neutron.db.loadbalancer.loadbalancer_db as lb_db from neutron.extensions import loadbalancer from neutron.openstack.common import jsonutils as json from neutron.openstack.common import log as logging from neutron.plugins.common import constants from neutron.services.loadbalancer.drivers import abstract_driver eventlet.monkey_patch(thread=True) LOG = logging.getLogger(__name__) RESP_STATUS = 0 RESP_REASON = 1 RESP_STR = 2 RESP_DATA = 3 L2_L3_WORKFLOW_TEMPLATE_NAME = 'openstack_l2_l3' L4_WORKFLOW_TEMPLATE_NAME = 'openstack_l4' ACTIONS_TO_SKIP = ['setup_l2_l3'] L4_ACTION_NAME = 'BaseCreate' TEMPLATE_HEADER = {'Content-Type': 'application/vnd.com.radware.vdirect.' 'template-parameters+json'} PROVISION_HEADER = {'Content-Type': 'application/vnd.com.radware.' 'vdirect.status+json'} CREATE_SERVICE_HEADER = {'Content-Type': 'application/vnd.com.radware.' 'vdirect.adc-service-specification+json'} ZIP_HEADER = {'Content-Type': 'application/x-zip-compressed'} L2_CTOR_PARAMS = {"service": "_REPLACE_", "ha_network_name": "HA-Network", "ha_ip_pool_name": "default", "allocate_ha_vrrp": True, "allocate_ha_ips": True} L2_SETUP_L2_L3_PARAMS = {"data_port": 1, "data_ip_address": "192.168.200.99", "data_ip_mask": "255.255.255.0", "gateway": "192.168.200.1", "ha_port": 2} driver_opts = [ cfg.StrOpt('vdirect_address', help=_('vdirect server IP address')), cfg.BoolOpt('service_ha_pair', default=False, help=_('service HA pair')), cfg.IntOpt('service_throughput', default=1000, help=_('service throughtput')), cfg.IntOpt('service_ssl_throughput', default=100, help=_('service ssl throughtput')), cfg.IntOpt('service_compression_throughput', default=100, help=_('service compression throughtput')), cfg.IntOpt('service_cache', default=20, help=_('service cache')) ] cfg.CONF.register_opts(driver_opts, "radware") class LoadBalancerDriver(abstract_driver.LoadBalancerAbstractDriver): """Radware lbaas driver.""" def __init__(self, plugin): rad = cfg.CONF.radware self.plugin = plugin self.service = { "haPair": rad.service_ha_pair, "primary": { "capacity": { "throughput": rad.service_throughput, "sslThroughput": rad.service_ssl_throughput, "compressionThroughput": rad.service_compression_throughput, "cache": rad.service_cache }, "network": { "type": "portgroup", "portgroups": ['DATA_NETWORK'] }, "adcType": "VA", "acceptableAdc": "Exact" } } vdirect_address = cfg.CONF.radware.vdirect_address self.rest_client = vDirectRESTClient(server=vdirect_address) self.queue = Queue.Queue() self.completion_handler = OperationCompletionHander(self.queue, self.rest_client, plugin) self.workflows_were_uploaded = False self.completion_handler.setDaemon(True) self.completion_handler.start() def create_vip(self, context, vip): LOG.debug(_('create_vip. vip: %s'), str(vip)) extended_vip = self.plugin.populate_vip_graph(context, vip) LOG.debug(_('create_vip. extended_vip: %s'), str(extended_vip)) network_id = self._get_vip_network_id(context, extended_vip) LOG.debug(_('create_vip. network_id: %s '), str(network_id)) service_name = self._get_service(extended_vip['pool_id'], network_id) LOG.debug(_('create_vip. service_name: %s '), service_name) self._create_workflow( vip['pool_id'], L4_WORKFLOW_TEMPLATE_NAME, {"service": service_name}) self._update_workflow( vip['pool_id'], L4_ACTION_NAME, extended_vip) def update_vip(self, context, old_vip, vip): extended_vip = self.plugin.populate_vip_graph(context, vip) self._update_workflow( vip['pool_id'], L4_ACTION_NAME, extended_vip, False, lb_db.Vip, vip['id']) def delete_vip(self, context, vip): """Delete a Vip First delete it from the device. If deletion ended OK - remove data from DB as well. If the deletion failed - mark elements with error status in DB """ extended_vip = self.plugin.populate_vip_graph(context, vip) try: # removing the WF will cause deletion of the configuration from the # device self._remove_workflow(extended_vip, context) except Exception: pool_id = extended_vip['pool_id'] LOG.exception(_('Failed to remove workflow %s'), pool_id) _update_vip_graph_status( self.plugin, context, extended_vip, constants.ERROR ) def create_pool(self, context, pool): # nothing to do pass def update_pool(self, context, old_pool, pool): self._handle_pool(context, pool) def delete_pool(self, context, pool,): self._handle_pool(context, pool, delete=True) def _handle_pool(self, context, pool, delete=False): vip_id = self.plugin.get_pool(context, pool['id']).get('vip_id', None) if vip_id: if delete: raise loadbalancer.PoolInUse(pool_id=pool['id']) else: vip = self.plugin.get_vip(context, vip_id) extended_vip = self.plugin.populate_vip_graph(context, vip) self._update_workflow( pool['id'], L4_ACTION_NAME, extended_vip, delete, lb_db.Pool, pool['id']) else: if delete: self.plugin._delete_db_pool(context, pool['id']) else: # we keep the pool in PENDING_UPDATE # no point to modify it since it is not connected to vip yet pass def create_member(self, context, member): self._handle_member(context, member) def update_member(self, context, old_member, member): self._handle_member(context, member) def delete_member(self, context, member): self._handle_member(context, member, delete=True) def _handle_member(self, context, member, delete=False): """Navigate the model. If a Vip is found - activate a bulk WF action. """ vip_id = self.plugin.get_pool( context, member['pool_id']).get('vip_id') if vip_id: vip = self.plugin.get_vip(context, vip_id) extended_vip = self.plugin.populate_vip_graph(context, vip) self._update_workflow( member['pool_id'], L4_ACTION_NAME, extended_vip, delete, lb_db.Member, member['id']) # We have to delete this member but it is not connected to a vip yet elif delete: self.plugin._delete_db_member(context, member['id']) def create_health_monitor(self, context, health_monitor): # Anything to do here? the hm is not connected to the graph yet pass def update_health_monitor(self, context, old_health_monitor, health_monitor, pool_id): self._handle_pool_health_monitor(context, health_monitor, pool_id) def create_pool_health_monitor(self, context, health_monitor, pool_id): self._handle_pool_health_monitor(context, health_monitor, pool_id) def delete_pool_health_monitor(self, context, health_monitor, pool_id): self._handle_pool_health_monitor(context, health_monitor, pool_id, True) def _handle_pool_health_monitor(self, context, health_monitor, pool_id, delete=False): """Push a graph to vDirect Navigate the model. Check if a pool is associated to the vip and push the graph to vDirect """ vip_id = self.plugin.get_pool(context, pool_id).get('vip_id', None) debug_params = {"hm_id": health_monitor['id'], "pool_id": pool_id, "delete": delete, "vip_id": vip_id} LOG.debug(_('_handle_pool_health_monitor. health_monitor = %(hm_id)s ' 'pool_id = %(pool_id)s delete = %(delete)s ' 'vip_id = %(vip_id)s'), debug_params) if vip_id: vip = self.plugin.get_vip(context, vip_id) extended_vip = self.plugin.populate_vip_graph(context, vip) self._update_workflow(pool_id, L4_ACTION_NAME, extended_vip, delete, lb_db.PoolMonitorAssociation, health_monitor['id']) elif delete: self.plugin._delete_db_pool_health_monitor(context, health_monitor['id'], pool_id) def stats(self, context, pool_id): # TODO(avishayb) implement return {"bytes_in": 0, "bytes_out": 0, "active_connections": 0, "total_connections": 0} def _get_vip_network_id(self, context, extended_vip): subnet = self.plugin._core_plugin.get_subnet( context, extended_vip['subnet_id']) return subnet['network_id'] @call_log.log def _update_workflow(self, wf_name, action, wf_params, delete=False, lbaas_entity=None, entity_id=None): """Update the WF state. Push the result to a queue for processing.""" if not self.workflows_were_uploaded: self._upload_workflows_templates() if action not in ACTIONS_TO_SKIP: params = _translate_vip_object_graph(wf_params) else: params = wf_params resource = '/api/workflow/%s/action/%s' % (wf_name, action) response = _rest_wrapper(self.rest_client.call('POST', resource, {'parameters': params}, TEMPLATE_HEADER)) LOG.debug(_('_update_workflow response: %s '), response) if action not in ACTIONS_TO_SKIP: ids = params.pop('__ids__', None) if not ids: raise q_exc.NeutronException( _('params must contain __ids__ field!') ) oper = OperationAttributes(response['uri'], ids, lbaas_entity, entity_id, delete=delete) LOG.debug(_('Pushing operation %s to the queue'), oper) self.queue.put_nowait(oper) def _remove_workflow(self, wf_params, context): params = _translate_vip_object_graph(wf_params) ids = params.pop('__ids__', None) if not ids: raise q_exc.NeutronException( _('params must contain __ids__ field!') ) wf_name = ids['pool'] LOG.debug(_('Remove the workflow %s') % wf_name) resource = '/api/workflow/%s' % (wf_name) response = _rest_wrapper(self.rest_client.call('DELETE', resource, None, None), [204, 202, 404]) msg = response.get('message', None) if msg == "Not Found": self.plugin._delete_db_vip(context, ids['vip']) else: oper = OperationAttributes(response['uri'], ids, lb_db.Vip, ids['vip'], delete=True) LOG.debug(_('Pushing operation %s to the queue'), oper) self.queue.put_nowait(oper) def _remove_service(self, service_name): resource = '/api/service/%s' % (service_name) _rest_wrapper(self.rest_client.call('DELETE', resource, None, None), [202]) def _get_service(self, pool_id, network_id): """Get a service name. if you cant find one, create a service and create l2_l2 WF. """ incoming_service_name = 'srv_' + network_id service_name = self._get_available_service(incoming_service_name) if not service_name: LOG.debug( 'Could not find a service named ' + incoming_service_name) service_name = self._create_service(pool_id, network_id) L2_CTOR_PARAMS["service"] = incoming_service_name wf_name = 'l2_l3_' + network_id if not self.workflows_were_uploaded: self._upload_workflows_templates() self._create_workflow( wf_name, L2_L3_WORKFLOW_TEMPLATE_NAME, L2_CTOR_PARAMS) self._update_workflow( wf_name, "setup_l2_l3", L2_SETUP_L2_L3_PARAMS) else: LOG.debug('A service named ' + service_name + ' was found.') return service_name def _create_service(self, pool_id, network_id): """create the service and provision it (async).""" # 1) create the service service_name = 'srv_' + network_id resource = '/api/service?name=%s' % service_name service = copy.deepcopy(self.service) service['primary']['network']['portgroups'] = [network_id] response = _rest_wrapper(self.rest_client.call('POST', resource, service, CREATE_SERVICE_HEADER), [201]) # 2) provision the service provision_uri = response['links']['actions']['provision'] _rest_wrapper(self.rest_client.call('POST', provision_uri, None, PROVISION_HEADER)) return service_name def _get_available_service(self, service_name): """Check if service exsists and return its name if it does.""" resource = '/api/service/' + service_name try: _rest_wrapper(self.rest_client.call('GET', resource, None, None), [200]) except Exception: return return service_name def _workflow_exists(self, pool_id): """Check if a WF having the name of the pool_id exists.""" resource = '/api/workflow/' + pool_id try: _rest_wrapper(self.rest_client.call('GET', resource, None, None), [200]) except Exception: return False return True def _create_workflow(self, wf_name, wf_template_name, create_workflow_params=None): """Create a WF if it doesnt exists yet.""" if not self.workflows_were_uploaded: self._upload_workflows_templates() if not self._workflow_exists(wf_name): if not create_workflow_params: create_workflow_params = {} resource = '/api/workflowTemplate/%s?name=%s' % ( wf_template_name, wf_name) params = {'parameters': create_workflow_params} response = _rest_wrapper(self.rest_client.call('POST', resource, params, TEMPLATE_HEADER)) LOG.debug(_('create_workflow response: %s'), str(response)) def _upload_workflows_templates(self): """Upload the driver workflows to vDirect server.""" workflows = {L2_L3_WORKFLOW_TEMPLATE_NAME: False, L4_WORKFLOW_TEMPLATE_NAME: False} resource = '/api/workflowTemplate' response = _rest_wrapper(self.rest_client.call('GET', resource, None, None), [200]) for wf in workflows.keys(): for wf_template in response: if wf == wf_template['name']: workflows[wf] = True break for wf, found in workflows.items(): if not found: self._upload_workflow_template(wf) self.workflows_were_uploaded = True def _upload_workflow_template(self, wf_template_name): """Upload a wf template to vDirect server.""" def _get_folders(): current_folder = os.path.dirname(os.path.realpath(__file__)) folders = [current_folder + '/workflows/' + wf_template_name, current_folder + '/workflows/common'] return folders LOG.debug(_('About to upload wf template named %s.zip'), wf_template_name) data = self._get_workflow_zip_data(_get_folders()) _rest_wrapper(self.rest_client.call('POST', '/api/workflowTemplate', data, ZIP_HEADER, binary=True), [201]) def _get_workflow_zip_data(self, folders): """Create a zip file on the fly and return its content.""" def _file_to_zip(f): n, ext = os.path.splitext(f) LOG.debug("file name = " + n + " ext = " + ext) return f == 'workflow.xml' or ext in ['.vm', '.groovy'] in_memory_file = StringIO() zip_file = ZipFile(in_memory_file, 'w') LOG.debug(_('Folders are %s'), folders) for folder in folders: LOG.debug(_('Folder is %s'), folder) for root, dirs, files in os.walk(folder): for file in files: if _file_to_zip(file): LOG.debug(_('About to add file %s to zip'), str(file)) LOG.debug(_('Path: %s'), os.path.join(root, file)) zip_file.write(os.path.join(root, file), os.path.basename(file)) LOG.debug(_('File %s was added to zip'), str(file)) zip_file.close() return in_memory_file.getvalue() class vDirectRESTClient: """REST server proxy to Radware vDirect.""" def __init__(self, server='localhost', port=2188, ssl=None, auth=None, timeout=5000, base_uri=''): self.server = server self.port = port self.ssl = ssl self.base_uri = base_uri self.timeout = timeout self.auth = None if auth: self.auth = 'Basic ' + base64.encodestring(auth).strip() debug_params = {'server': self.server, 'port': self.port, 'ssl': self.ssl} LOG.debug(_('vDirectRESTClient:init server=%(server)s, ' 'port=%(port)d, ' 'ssl=%(ssl)r'), debug_params) @call_log.log def call(self, action, resource, data, headers, binary=False): if resource.startswith('http'): uri = resource else: uri = self.base_uri + resource if binary: body = data else: body = json.dumps(data) debug_data = 'binary' if binary else body debug_data = debug_data if debug_data else 'EMPTY' if not headers: headers = {} conn = None if self.ssl: conn = httplib.HTTPSConnection( self.server, self.port, timeout=self.timeout) if conn is None: LOG.error(_('vdirectRESTClient: Could not establish HTTPS ' 'connection')) return 0, None, None, None else: conn = httplib.HTTPConnection( self.server, self.port, timeout=self.timeout) if conn is None: LOG.error(_('vdirectRESTClient: Could not establish HTTP ' 'connection')) return 0, None, None, None try: conn.request(action, uri, body, headers) response = conn.getresponse() respstr = response.read() respdata = respstr try: respdata = json.loads(respstr) except ValueError: # response was not JSON, ignore the exception pass ret = (response.status, response.reason, respstr, respdata) except (socket.timeout, socket.error) as e: log_dict = {'action': action, 'e': e} LOG.error(_('vdirectRESTClient: %(action)s failure, %(e)r'), log_dict) ret = 0, None, None, None conn.close() return ret class OperationAttributes: """Holds operation attributes.""" def __init__(self, operation_url, object_graph, lbaas_entity=None, entity_id=None, delete=False): self.operation_url = operation_url self.object_graph = object_graph self.delete = delete self.lbaas_entity = lbaas_entity self.entity_id = entity_id self.creation_time = time.time() def __repr__(self): items = ("%s = %r" % (k, v) for k, v in self.__dict__.items()) return "<%s: {%s}>" % (self.__class__.__name__, ', '.join(items)) class OperationCompletionHander(threading.Thread): """Update DB with operation status or delete the entity from DB.""" def __init__(self, queue, rest_client, plugin): threading.Thread.__init__(self) self.queue = queue self.rest_client = rest_client self.admin_ctx = qcontext.get_admin_context() self.plugin = plugin self.stoprequest = threading.Event() def _get_db_status(self, operation, success, messages=None): """Get the db_status based on the status of the vdirect operation.""" if not success: # we have a failure - log it and set the return ERROR as DB state msg = ', '.join(messages) if messages else "unknown" error_params = {"operation": operation, "msg": msg} LOG.error(_('Operation %(operation)s failed. Reason: %(msg)s'), error_params) return constants.ERROR if operation.delete: return None else: return constants.ACTIVE def join(self, timeout=None): self.stoprequest.set() super(OperationCompletionHander, self).join(timeout) def run(self): oper = None while not self.stoprequest.isSet(): try: oper = self.queue.get(timeout=1) LOG.debug('Operation consumed from the queue: ' + str(oper)) # check the status - if oper is done: update the db , # else push the oper again to the queue result = self.rest_client.call('GET', oper.operation_url, None, None) completed = result[RESP_DATA]['complete'] if completed: # operation is done - update the DB with the status # or delete the entire graph from DB success = result[RESP_DATA]['success'] sec_to_completion = time.time() - oper.creation_time debug_data = {'oper': oper, 'sec_to_completion': sec_to_completion, 'success': success} LOG.debug(_('Operation %(oper)s is completed after ' '%(sec_to_completion)d sec ' 'with success status: %(success)s :'), debug_data) db_status = self._get_db_status(oper, success) if db_status: _update_vip_graph_status( self.plugin, self.admin_ctx, oper, db_status) else: _remove_object_from_db( self.plugin, self.admin_ctx, oper) else: # not completed - push to the queue again LOG.debug(_('Operation %s is not completed yet..') % oper) # queue is empty - lets take a short rest if self.queue.empty(): time.sleep(1) self.queue.put_nowait(oper) # send a signal to the queue that the job is done self.queue.task_done() except Queue.Empty: continue except Exception: m = _("Exception was thrown inside OperationCompletionHander") LOG.exception(m) def _rest_wrapper(response, success_codes=[202]): """Wrap a REST call and make sure a valid status is returned.""" if response[RESP_STATUS] not in success_codes: raise q_exc.NeutronException(str(response[RESP_STATUS]) + ':' + response[RESP_REASON] + '. Error description: ' + response[RESP_STR]) else: return response[RESP_DATA] def _update_vip_graph_status(plugin, context, oper, status): """Update the status Of all the Vip object graph or a specific entity in the graph. """ LOG.debug(_('_update: %s '), oper) if oper.lbaas_entity == lb_db.PoolMonitorAssociation: plugin.update_pool_health_monitor(context, oper.entity_id, oper.object_graph['pool'], status) elif oper.entity_id: plugin.update_status(context, oper.lbaas_entity, oper.entity_id, status) else: # update the whole vip graph status plugin.update_status(context, lb_db.Vip, oper.object_graph['vip'], status) plugin.update_status(context, lb_db.Pool, oper.object_graph['pool'], status) for member_id in oper.object_graph['members']: plugin.update_status(context, lb_db.Member, member_id, status) for hm_id in oper.object_graph['health_monitors']: plugin.update_pool_health_monitor(context, hm_id, oper.object_graph['pool'], status) def _remove_object_from_db(plugin, context, oper): """Remove a specific entity from db.""" LOG.debug(_('_remove_object_from_db %s'), str(oper)) if oper.lbaas_entity == lb_db.PoolMonitorAssociation: plugin._delete_db_pool_health_monitor(context, oper.entity_id, oper.object_graph['pool_id']) elif oper.lbaas_entity == lb_db.Member: plugin._delete_db_member(context, oper.entity_id) elif oper.lbaas_entity == lb_db.Vip: plugin._delete_db_vip(context, oper.entity_id) elif oper.lbaas_entity == lb_db.Pool: plugin._delete_db_pool(context, oper.entity_id) else: raise q_exc.NeutronException( _('Tried to remove unsupported lbaas entity %s!'), str(oper.lbaas_entity) ) TRANSLATION_DEFAULTS = {'session_persistence_type': 'SOURCE_IP', 'session_persistence_cookie_name': 'none', 'url_path': '/', 'http_method': 'GET', 'expected_codes': '200' } VIP_PROPERTIES = ['address', 'protocol_port', 'protocol', 'connection_limit', 'admin_state_up', 'session_persistence_type', 'session_persistence_cookie_name'] POOL_PROPERTIES = ['protocol', 'lb_method', 'admin_state_up'] MEMBER_PROPERTIES = ['address', 'protocol_port', 'weight', 'admin_state_up'] HEALTH_MONITOR_PROPERTIES = ['type', 'delay', 'timeout', 'max_retries', 'admin_state_up', 'url_path', 'http_method', 'expected_codes', 'id'] def _translate_vip_object_graph(extended_vip): """Translate the extended vip translate to a structure that can be understood by the workflow. """ def _create_key(prefix, property_name): return prefix + '_' + property_name + '_array' def _trans_prop_name(prop_name): if prop_name == 'id': return 'uuid' else: return prop_name def get_ids(extended_vip): ids = {} ids['vip'] = extended_vip['id'] ids['pool'] = extended_vip['pool']['id'] ids['members'] = [m['id'] for m in extended_vip['members']] ids['health_monitors'] = [ hm['id'] for hm in extended_vip['health_monitors'] ] return ids trans_vip = {} LOG.debug('Vip graph to be translated: ' + str(extended_vip)) for vip_property in VIP_PROPERTIES: trans_vip['vip_' + vip_property] = extended_vip.get( vip_property, TRANSLATION_DEFAULTS.get(vip_property)) for pool_property in POOL_PROPERTIES: trans_vip['pool_' + pool_property] = extended_vip[ 'pool'][pool_property] for member_property in MEMBER_PROPERTIES: trans_vip[_create_key('member', member_property)] = [] for member in extended_vip['members']: for member_property in MEMBER_PROPERTIES: trans_vip[_create_key('member', member_property)].append( member.get(member_property, TRANSLATION_DEFAULTS.get(member_property))) for hm_property in HEALTH_MONITOR_PROPERTIES: trans_vip[ _create_key('hm', _trans_prop_name(hm_property))] = [] for hm in extended_vip['health_monitors']: for hm_property in HEALTH_MONITOR_PROPERTIES: value = hm.get(hm_property, TRANSLATION_DEFAULTS.get(hm_property)) trans_vip[_create_key('hm', _trans_prop_name(hm_property))].append(value) ids = get_ids(extended_vip) trans_vip['__ids__'] = ids LOG.debug('Translated Vip graph: ' + str(trans_vip)) return trans_vip def _drop_pending_delete_elements(extended_vip): """Traverse the Vip object graph and drop PENDEING_DELETE nodes.""" # What if the pool is pendening_delete? extended_vip['health_monitors'] = [ hm for hm in extended_vip['health_monitors'] if hm['status'] != constants.PENDING_DELETE ] extended_vip['members'] = [ member for member in extended_vip['members'] if member['status'] != constants.PENDING_DELETE ] return extended_vip