vmware-nsx/quantum/db/loadbalancer/loadbalancer_db.py
He Jie Xu 4e7521bb9c Supporting pagination in api v2.0
Implements bp support-pagination-in-api-v2.0

Add sort and pagination feature for api v2.0.
*Add sort_key, sort_dir for sorting feature.
*Add limit, marker, page_reverse for pagination feature.
*Add emulated sorting and pagination

Native pagination and native sorting implemented for:
*plugins: ovs, linuxbridge
*extensions: l3 securitygroup

Emulated pagination and sorting enabled for:
extensions: lbaas

Change-Id: I28ec6ab7bcae9839cb42c6d60bbfff7250de13ed
2013-02-19 23:04:13 +08:00

697 lines
29 KiB
Python

# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright 2013 OpenStack LLC. All rights reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from oslo.config import cfg
import sqlalchemy as sa
from sqlalchemy import orm
from sqlalchemy.orm import exc
from sqlalchemy.sql import expression as expr
import webob.exc as w_exc
from quantum.api.v2 import attributes
from quantum.common import exceptions as q_exc
from quantum.db import db_base_plugin_v2
from quantum.db import model_base
from quantum.db import models_v2
from quantum.extensions import loadbalancer
from quantum.extensions.loadbalancer import LoadBalancerPluginBase
from quantum.openstack.common import log as logging
from quantum.openstack.common import uuidutils
from quantum.plugins.common import constants
from quantum import policy
LOG = logging.getLogger(__name__)
class SessionPersistence(model_base.BASEV2):
vip_id = sa.Column(sa.String(36),
sa.ForeignKey("vips.id"),
primary_key=True)
type = sa.Column(sa.Enum("SOURCE_IP",
"HTTP_COOKIE",
"APP_COOKIE",
name="sesssionpersistences_type"),
nullable=False)
cookie_name = sa.Column(sa.String(1024))
class PoolStatistics(model_base.BASEV2):
"""Represents pool statistics """
pool_id = sa.Column(sa.String(36), sa.ForeignKey("pools.id"),
primary_key=True)
bytes_in = sa.Column(sa.Integer, nullable=False)
bytes_out = sa.Column(sa.Integer, nullable=False)
active_connections = sa.Column(sa.Integer, nullable=False)
total_connections = sa.Column(sa.Integer, nullable=False)
class Vip(model_base.BASEV2, models_v2.HasId, models_v2.HasTenant):
"""Represents a v2 quantum loadbalancer vip."""
name = sa.Column(sa.String(255))
description = sa.Column(sa.String(255))
subnet_id = sa.Column(sa.String(36), nullable=False)
address = sa.Column(sa.String(64))
port = sa.Column(sa.Integer, nullable=False)
protocol = sa.Column(sa.Enum("HTTP", "HTTPS", "TCP", name="lb_protocols"),
nullable=False)
pool_id = sa.Column(sa.String(36), nullable=False)
session_persistence = orm.relationship(SessionPersistence,
uselist=False,
backref="vips",
cascade="all, delete-orphan")
status = sa.Column(sa.String(16), nullable=False)
admin_state_up = sa.Column(sa.Boolean(), nullable=False)
connection_limit = sa.Column(sa.Integer)
class Member(model_base.BASEV2, models_v2.HasId, models_v2.HasTenant):
"""Represents a v2 quantum loadbalancer member."""
pool_id = sa.Column(sa.String(36), sa.ForeignKey("pools.id"),
nullable=False)
address = sa.Column(sa.String(64), nullable=False)
port = sa.Column(sa.Integer, nullable=False)
weight = sa.Column(sa.Integer, nullable=False)
status = sa.Column(sa.String(16), nullable=False)
admin_state_up = sa.Column(sa.Boolean(), nullable=False)
class Pool(model_base.BASEV2, models_v2.HasId, models_v2.HasTenant):
"""Represents a v2 quantum loadbalancer pool."""
vip_id = sa.Column(sa.String(36), sa.ForeignKey("vips.id"))
name = sa.Column(sa.String(255))
description = sa.Column(sa.String(255))
subnet_id = sa.Column(sa.String(36), nullable=False)
protocol = sa.Column(sa.Enum("HTTP", "HTTPS", "TCP", name="lb_protocols"),
nullable=False)
lb_method = sa.Column(sa.Enum("ROUND_ROBIN",
"LEAST_CONNECTIONS",
"SOURCE_IP",
name="pools_lb_method"),
nullable=False)
status = sa.Column(sa.String(16), nullable=False)
admin_state_up = sa.Column(sa.Boolean(), nullable=False)
stats = orm.relationship(PoolStatistics,
uselist=False,
backref="pools",
cascade="all, delete-orphan")
members = orm.relationship(Member, backref="pools",
cascade="all, delete-orphan")
monitors = orm.relationship("PoolMonitorAssociation", backref="pools",
cascade="all, delete-orphan")
class HealthMonitor(model_base.BASEV2, models_v2.HasId, models_v2.HasTenant):
"""Represents a v2 quantum loadbalancer healthmonitor."""
type = sa.Column(sa.Enum("PING", "TCP", "HTTP", "HTTPS",
name="healthmontiors_type"),
nullable=False)
delay = sa.Column(sa.Integer, nullable=False)
timeout = sa.Column(sa.Integer, nullable=False)
max_retries = sa.Column(sa.Integer, nullable=False)
http_method = sa.Column(sa.String(16))
url_path = sa.Column(sa.String(255))
expected_codes = sa.Column(sa.String(64))
status = sa.Column(sa.String(16), nullable=False)
admin_state_up = sa.Column(sa.Boolean(), nullable=False)
class PoolMonitorAssociation(model_base.BASEV2):
"""
Represents the many-to-many association between pool and
healthMonitor classes
"""
pool_id = sa.Column(sa.String(36),
sa.ForeignKey("pools.id"),
primary_key=True)
monitor_id = sa.Column(sa.String(36),
sa.ForeignKey("healthmonitors.id"),
primary_key=True)
monitor = orm.relationship("HealthMonitor",
backref="pools_poolmonitorassociations")
class LoadBalancerPluginDb(LoadBalancerPluginBase):
"""
A class that wraps the implementation of the Quantum
loadbalancer plugin database access interface using SQLAlchemy models.
"""
# TODO(lcui):
# A set of internal facility methods are borrowed from QuantumDbPluginV2
# class and hence this is duplicate. We need to pull out those methods
# into a seperate class which can be used by both QuantumDbPluginV2 and
# this class (and others).
def _get_tenant_id_for_create(self, context, resource):
if context.is_admin and 'tenant_id' in resource:
tenant_id = resource['tenant_id']
elif ('tenant_id' in resource and
resource['tenant_id'] != context.tenant_id):
reason = _('Cannot create resource for another tenant')
raise q_exc.AdminRequired(reason=reason)
else:
tenant_id = context.tenant_id
return tenant_id
def _fields(self, resource, fields):
if fields:
return dict((key, item) for key, item in resource.iteritems()
if key in fields)
return resource
def _apply_filters_to_query(self, query, model, filters):
if filters:
for key, value in filters.iteritems():
column = getattr(model, key, None)
if column:
query = query.filter(column.in_(value))
return query
def _get_collection_query(self, context, model, filters=None):
collection = self._model_query(context, model)
collection = self._apply_filters_to_query(collection, model, filters)
return collection
def _get_collection(self, context, model, dict_func, filters=None,
fields=None, sorts=None, limit=None, marker_obj=None,
page_reverse=False):
query = self._get_collection_query(context, model, filters)
return [dict_func(c, fields) for c in query.all()]
def _get_collection_count(self, context, model, filters=None):
return self._get_collection_query(context, model, filters).count()
def _model_query(self, context, model):
query = context.session.query(model)
query_filter = None
if not context.is_admin and hasattr(model, 'tenant_id'):
if hasattr(model, 'shared'):
query_filter = ((model.tenant_id == context.tenant_id) |
(model.shared))
else:
query_filter = (model.tenant_id == context.tenant_id)
if query_filter is not None:
query = query.filter(query_filter)
return query
def _get_by_id(self, context, model, id):
query = self._model_query(context, model)
return query.filter(model.id == id).one()
def update_status(self, context, model, id, status):
with context.session.begin(subtransactions=True):
v_db = self._get_resource(context, model, id)
v_db.update({'status': status})
def _get_resource(self, context, model, id):
try:
r = self._get_by_id(context, model, id)
except exc.NoResultFound:
if issubclass(model, Vip):
raise loadbalancer.VipNotFound(vip_id=id)
elif issubclass(model, Pool):
raise loadbalancer.PoolNotFound(pool_id=id)
elif issubclass(model, Member):
raise loadbalancer.MemberNotFound(member_id=id)
elif issubclass(model, HealthMonitor):
raise loadbalancer.HealthMonitorNotFound(monitor_id=id)
else:
raise
return r
########################################################
# VIP DB access
def _make_vip_dict(self, vip, fields=None):
res = {'id': vip['id'],
'tenant_id': vip['tenant_id'],
'name': vip['name'],
'description': vip['description'],
'subnet_id': vip['subnet_id'],
'address': vip['address'],
'port': vip['port'],
'protocol': vip['protocol'],
'pool_id': vip['pool_id'],
'connection_limit': vip['connection_limit'],
'admin_state_up': vip['admin_state_up'],
'status': vip['status']}
if vip['session_persistence']:
s_p = {
'type': vip['session_persistence']['type']
}
if vip['session_persistence']['type'] == 'APP_COOKIE':
s_p['cookie_name'] = vip['session_persistence']['cookie_name']
res['session_persistence'] = s_p
return self._fields(res, fields)
def _update_pool_vip_info(self, context, pool_id, vip_id):
pool_db = self._get_resource(context, Pool, pool_id)
with context.session.begin(subtransactions=True):
pool_db.update({'vip_id': vip_id})
def _check_session_persistence_info(self, info):
""" Performs sanity check on session persistence info.
:param info: Session persistence info
"""
if info['type'] == 'APP_COOKIE':
if not info.get('cookie_name'):
raise ValueError(_("'cookie_name' should be specified for this"
" type of session persistence."))
else:
if 'cookie_name' in info:
raise ValueError(_("'cookie_name' is not allowed for this type"
" of session persistence"))
def _create_session_persistence_db(self, session_info, vip_id):
self._check_session_persistence_info(session_info)
sesspersist_db = SessionPersistence(
type=session_info['type'],
cookie_name=session_info.get('cookie_name'),
vip_id=vip_id)
return sesspersist_db
def _update_vip_session_persistence(self, context, vip_id, info):
self._check_session_persistence_info(info)
vip = self._get_resource(context, Vip, vip_id)
with context.session.begin(subtransactions=True):
# Update sessionPersistence table
sess_qry = context.session.query(SessionPersistence)
sesspersist_db = sess_qry.filter_by(vip_id=vip_id).first()
# Insert a None cookie_info if it is not present to overwrite an
# an existing value in the database.
if 'cookie_name' not in info:
info['cookie_name'] = None
if sesspersist_db:
sesspersist_db.update(info)
else:
sesspersist_db = SessionPersistence(
type=info['type'],
cookie_name=info['cookie_name'],
vip_id=vip_id)
context.session.add(sesspersist_db)
# Update vip table
vip.session_persistence = sesspersist_db
context.session.add(vip)
def _delete_session_persistence(self, context, vip_id):
with context.session.begin(subtransactions=True):
sess_qry = context.session.query(SessionPersistence)
sess_qry.filter_by(vip_id=vip_id).delete()
def create_vip(self, context, vip):
v = vip['vip']
tenant_id = self._get_tenant_id_for_create(context, v)
with context.session.begin(subtransactions=True):
if v['address'] is attributes.ATTR_NOT_SPECIFIED:
address = None
else:
address = v['address']
vip_db = Vip(id=uuidutils.generate_uuid(),
tenant_id=tenant_id,
name=v['name'],
description=v['description'],
subnet_id=v['subnet_id'],
address=address,
port=v['port'],
protocol=v['protocol'],
pool_id=v['pool_id'],
connection_limit=v['connection_limit'],
admin_state_up=v['admin_state_up'],
status=constants.PENDING_CREATE)
vip_id = vip_db['id']
session_info = v['session_persistence']
if session_info:
s_p = self._create_session_persistence_db(session_info, vip_id)
vip_db.session_persistence = s_p
context.session.add(vip_db)
self._update_pool_vip_info(context, v['pool_id'], vip_id)
vip_db = self._get_resource(context, Vip, vip_id)
return self._make_vip_dict(vip_db)
def update_vip(self, context, id, vip):
v = vip['vip']
sess_persist = v.pop('session_persistence', None)
with context.session.begin(subtransactions=True):
if sess_persist:
self._update_vip_session_persistence(context, id, sess_persist)
else:
self._delete_session_persistence(context, id)
vip_db = self._get_resource(context, Vip, id)
old_pool_id = vip_db['pool_id']
if v:
vip_db.update(v)
# If the pool_id is changed, we need to update
# the associated pools
if 'pool_id' in v:
self._update_pool_vip_info(context, old_pool_id, None)
self._update_pool_vip_info(context, v['pool_id'], id)
return self._make_vip_dict(vip_db)
def delete_vip(self, context, id):
with context.session.begin(subtransactions=True):
vip = self._get_resource(context, Vip, id)
qry = context.session.query(Pool)
for pool in qry.filter_by(vip_id=id).all():
pool.update({"vip_id": None})
context.session.delete(vip)
def get_vip(self, context, id, fields=None):
vip = self._get_resource(context, Vip, id)
return self._make_vip_dict(vip, fields)
def get_vips(self, context, filters=None, fields=None):
return self._get_collection(context, Vip,
self._make_vip_dict,
filters=filters, fields=fields)
########################################################
# Pool DB access
def _make_pool_dict(self, context, pool, fields=None):
res = {'id': pool['id'],
'tenant_id': pool['tenant_id'],
'name': pool['name'],
'description': pool['description'],
'subnet_id': pool['subnet_id'],
'protocol': pool['protocol'],
'vip_id': pool['vip_id'],
'lb_method': pool['lb_method'],
'admin_state_up': pool['admin_state_up'],
'status': pool['status']}
# Get the associated members
res['members'] = [member['id'] for member in pool['members']]
# Get the associated health_monitors
res['health_monitors'] = [
monitor['monitor_id'] for monitor in pool['monitors']]
return self._fields(res, fields)
def _update_pool_member_info(self, context, pool_id, membersInfo):
with context.session.begin(subtransactions=True):
member_qry = context.session.query(Member)
for member_id in membersInfo:
try:
member = member_qry.filter_by(id=member_id).one()
member.update({'pool_id': pool_id})
except exc.NoResultFound:
raise loadbalancer.MemberNotFound(member_id=member_id)
def _create_pool_stats(self, context, pool_id):
# This is internal method to add pool statistics. It won't
# be exposed to API
stats_db = PoolStatistics(
pool_id=pool_id,
bytes_in=0,
bytes_out=0,
active_connections=0,
total_connections=0
)
return stats_db
def _delete_pool_stats(self, context, pool_id):
# This is internal method to delete pool statistics. It won't
# be exposed to API
with context.session.begin(subtransactions=True):
stats_qry = context.session.query(PoolStatistics)
try:
stats = stats_qry.filter_by(pool_id=pool_id).one()
except exc.NoResultFound:
raise loadbalancer.PoolStatsNotFound(pool_id=pool_id)
context.session.delete(stats)
def create_pool(self, context, pool):
v = pool['pool']
tenant_id = self._get_tenant_id_for_create(context, v)
with context.session.begin(subtransactions=True):
pool_db = Pool(id=uuidutils.generate_uuid(),
tenant_id=tenant_id,
name=v['name'],
description=v['description'],
subnet_id=v['subnet_id'],
protocol=v['protocol'],
lb_method=v['lb_method'],
admin_state_up=v['admin_state_up'],
status=constants.PENDING_CREATE)
pool_db.stats = self._create_pool_stats(context, pool_db['id'])
context.session.add(pool_db)
pool_db = self._get_resource(context, Pool, pool_db['id'])
return self._make_pool_dict(context, pool_db)
def update_pool(self, context, id, pool):
v = pool['pool']
with context.session.begin(subtransactions=True):
pool_db = self._get_resource(context, Pool, id)
if v:
pool_db.update(v)
return self._make_pool_dict(context, pool_db)
def delete_pool(self, context, id):
# Check if the pool is in use
vip = context.session.query(Vip).filter_by(pool_id=id).first()
if vip:
raise loadbalancer.PoolInUse(pool_id=id)
with context.session.begin(subtransactions=True):
self._delete_pool_stats(context, id)
pool_db = self._get_resource(context, Pool, id)
context.session.delete(pool_db)
def get_pool(self, context, id, fields=None):
pool = self._get_resource(context, Pool, id)
return self._make_pool_dict(context, pool, fields)
def get_pools(self, context, filters=None, fields=None):
collection = self._model_query(context, Pool)
collection = self._apply_filters_to_query(collection, Pool, filters)
return [self._make_pool_dict(context, c, fields)
for c in collection.all()]
def get_stats(self, context, pool_id):
with context.session.begin(subtransactions=True):
pool_qry = context.session.query(Pool)
try:
pool = pool_qry.filter_by(id=pool_id).one()
stats = pool['stats']
except exc.NoResultFound:
raise loadbalancer.PoolStatsNotFound(pool_id=pool_id)
res = {'bytes_in': stats['bytes_in'],
'bytes_out': stats['bytes_out'],
'active_connections': stats['active_connections'],
'total_connections': stats['total_connections']}
return {'stats': res}
def create_pool_health_monitor(self, context, health_monitor, pool_id):
monitor_id = health_monitor['health_monitor']['id']
with context.session.begin(subtransactions=True):
monitor_qry = context.session.query(HealthMonitor)
try:
monitor = monitor_qry.filter_by(id=monitor_id).one()
monitor.update({'pool_id': pool_id})
except exc.NoResultFound:
raise loadbalancer.HealthMonitorNotFound(monitor_id=monitor_id)
try:
qry = context.session.query(Pool)
pool = qry.filter_by(id=pool_id).one()
except exc.NoResultFound:
raise loadbalancer.PoolNotFound(pool_id=pool_id)
assoc = PoolMonitorAssociation(pool_id=pool_id,
monitor_id=monitor_id)
assoc.monitor = monitor
pool.monitors.append(assoc)
monitors = []
try:
qry = context.session.query(Pool)
pool = qry.filter_by(id=pool_id).one()
for monitor in pool['monitors']:
monitors.append(monitor['monitor_id'])
except exc.NoResultFound:
pass
res = {"health_monitor": monitors}
return res
def delete_pool_health_monitor(self, context, id, pool_id):
with context.session.begin(subtransactions=True):
try:
pool_qry = context.session.query(Pool)
pool = pool_qry.filter_by(id=pool_id).one()
except exc.NoResultFound:
raise loadbalancer.PoolNotFound(pool_id=pool_id)
try:
monitor_qry = context.session.query(PoolMonitorAssociation)
monitor = monitor_qry.filter_by(monitor_id=id,
pool_id=pool_id).one()
pool.monitors.remove(monitor)
except exc.NoResultFound:
raise loadbalancer.HealthMonitorNotFound(monitor_id=id)
def get_pool_health_monitor(self, context, id, pool_id, fields=None):
healthmonitor = self._get_resource(context, HealthMonitor, id)
return self._make_health_monitor_dict(healthmonitor, fields)
########################################################
# Member DB access
def _make_member_dict(self, member, fields=None):
res = {'id': member['id'],
'tenant_id': member['tenant_id'],
'pool_id': member['pool_id'],
'address': member['address'],
'port': member['port'],
'weight': member['weight'],
'admin_state_up': member['admin_state_up'],
'status': member['status']}
return self._fields(res, fields)
def create_member(self, context, member):
v = member['member']
tenant_id = self._get_tenant_id_for_create(context, v)
with context.session.begin(subtransactions=True):
pool = None
try:
qry = context.session.query(Pool)
pool = qry.filter_by(id=v['pool_id']).one()
except exc.NoResultFound:
raise loadbalancer.PoolNotFound(pool_id=v['pool_id'])
member_db = Member(id=uuidutils.generate_uuid(),
tenant_id=tenant_id,
pool_id=v['pool_id'],
address=v['address'],
port=v['port'],
weight=v['weight'],
admin_state_up=v['admin_state_up'],
status=constants.PENDING_CREATE)
context.session.add(member_db)
return self._make_member_dict(member_db)
def update_member(self, context, id, member):
v = member['member']
with context.session.begin(subtransactions=True):
member_db = self._get_resource(context, Member, id)
old_pool_id = member_db['pool_id']
if v:
member_db.update(v)
return self._make_member_dict(member_db)
def delete_member(self, context, id):
with context.session.begin(subtransactions=True):
member_db = self._get_resource(context, Member, id)
context.session.delete(member_db)
def get_member(self, context, id, fields=None):
member = self._get_resource(context, Member, id)
return self._make_member_dict(member, fields)
def get_members(self, context, filters=None, fields=None):
return self._get_collection(context, Member,
self._make_member_dict,
filters=filters, fields=fields)
########################################################
# HealthMonitor DB access
def _make_health_monitor_dict(self, health_monitor, fields=None):
res = {'id': health_monitor['id'],
'tenant_id': health_monitor['tenant_id'],
'type': health_monitor['type'],
'delay': health_monitor['delay'],
'timeout': health_monitor['timeout'],
'max_retries': health_monitor['max_retries'],
'admin_state_up': health_monitor['admin_state_up'],
'status': health_monitor['status']}
# no point to add the values below to
# the result if the 'type' is not HTTP/S
if res['type'] in ['HTTP', 'HTTPS']:
for attr in ['url_path', 'http_method', 'expected_codes']:
res[attr] = health_monitor[attr]
return self._fields(res, fields)
def create_health_monitor(self, context, health_monitor):
v = health_monitor['health_monitor']
tenant_id = self._get_tenant_id_for_create(context, v)
with context.session.begin(subtransactions=True):
monitor_db = HealthMonitor(id=uuidutils.generate_uuid(),
tenant_id=tenant_id,
type=v['type'],
delay=v['delay'],
timeout=v['timeout'],
max_retries=v['max_retries'],
http_method=v['http_method'],
url_path=v['url_path'],
expected_codes=v['expected_codes'],
admin_state_up=v['admin_state_up'],
status=constants.PENDING_CREATE)
context.session.add(monitor_db)
return self._make_health_monitor_dict(monitor_db)
def update_health_monitor(self, context, id, health_monitor):
v = health_monitor['health_monitor']
with context.session.begin(subtransactions=True):
monitor_db = self._get_resource(context, HealthMonitor, id)
if v:
monitor_db.update(v)
return self._make_health_monitor_dict(monitor_db)
def delete_health_monitor(self, context, id):
with context.session.begin(subtransactions=True):
assoc_qry = context.session.query(PoolMonitorAssociation)
pool_qry = context.session.query(Pool)
for assoc in assoc_qry.filter_by(monitor_id=id).all():
try:
pool = pool_qry.filter_by(id=assoc['pool_id']).one()
except exc.NoResultFound:
raise loadbalancer.PoolNotFound(pool_id=pool['id'])
pool.monitors.remove(assoc)
monitor_db = self._get_resource(context, HealthMonitor, id)
context.session.delete(monitor_db)
def get_health_monitor(self, context, id, fields=None):
healthmonitor = self._get_resource(context, HealthMonitor, id)
return self._make_health_monitor_dict(healthmonitor, fields)
def get_health_monitors(self, context, filters=None, fields=None):
return self._get_collection(context, HealthMonitor,
self._make_health_monitor_dict,
filters=filters, fields=fields)