22a6efb13a
Fixes bug 1096205 Change-Id: Icb34b567a8630bb7247b2bb2d6efaf53269bf84b
283 lines
11 KiB
Python
283 lines
11 KiB
Python
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
|
|
|
# Copyright 2012 Nicira, Inc.
|
|
# All Rights Reserved
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
#
|
|
# @author: David Lapsley <dlapsley@nicira.com>, Nicira Networks, Inc.
|
|
# @author: Aaron Rosen, Nicira Networks, Inc.
|
|
|
|
|
|
from abc import ABCMeta
|
|
import httplib
|
|
import time
|
|
import logging
|
|
|
|
|
|
from quantum.plugins.nicira.nicira_nvp_plugin.api_client.common import (
|
|
_conn_str)
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO)
|
|
LOG = logging.getLogger(__name__)
|
|
#Default parameters.
|
|
GENERATION_ID_TIMEOUT = -1
|
|
DEFAULT_CONCURRENT_CONNECTIONS = 3
|
|
DEFAULT_CONNECT_TIMEOUT = 5
|
|
|
|
|
|
class NvpApiClient(object):
|
|
'''An abstract baseclass for all NvpApiClient implementations.
|
|
|
|
This defines the interface and property structure for synchronous and
|
|
coroutine-based classes.
|
|
'''
|
|
|
|
__metaclass__ = ABCMeta
|
|
|
|
CONN_IDLE_TIMEOUT = 60 * 15
|
|
|
|
def _create_connection(self, host, port, is_ssl):
|
|
if is_ssl:
|
|
return httplib.HTTPSConnection(host, port,
|
|
timeout=self._connect_timeout)
|
|
return httplib.HTTPConnection(host, port,
|
|
timeout=self._connect_timeout)
|
|
|
|
@staticmethod
|
|
def _conn_params(http_conn):
|
|
is_ssl = isinstance(http_conn, httplib.HTTPSConnection)
|
|
return (http_conn.host, http_conn.port, is_ssl)
|
|
|
|
@property
|
|
def user(self):
|
|
return self._user
|
|
|
|
@property
|
|
def password(self):
|
|
return self._password
|
|
|
|
@property
|
|
def nvp_config_gen(self):
|
|
# If nvp_gen_timeout is not -1 then:
|
|
# Maintain a timestamp along with the generation ID. Hold onto the
|
|
# ID long enough to be useful and block on sequential requests but
|
|
# not long enough to persist when Onix db is cleared, which resets
|
|
# the generation ID, causing the DAL to block indefinitely with some
|
|
# number that's higher than the cluster's value.
|
|
if self._nvp_gen_timeout != -1:
|
|
ts = self._nvp_config_gen_ts
|
|
if ts is not None:
|
|
if (time.time() - ts) > self._nvp_gen_timeout:
|
|
return None
|
|
return self._nvp_config_gen
|
|
|
|
@nvp_config_gen.setter
|
|
def nvp_config_gen(self, value):
|
|
if self._nvp_config_gen != value:
|
|
if self._nvp_gen_timeout != -1:
|
|
self._nvp_config_gen_ts = time.time()
|
|
self._nvp_config_gen = value
|
|
|
|
def auth_cookie(self, conn):
|
|
cookie = None
|
|
data = self._get_provider_data(conn)
|
|
if data:
|
|
cookie = data[1]
|
|
return cookie
|
|
|
|
def set_auth_cookie(self, conn, cookie):
|
|
data = self._get_provider_data(conn)
|
|
if data:
|
|
self._set_provider_data(conn, (data[0], cookie))
|
|
|
|
def acquire_connection(self, auto_login=True, headers=None, rid=-1):
|
|
'''Check out an available HTTPConnection instance.
|
|
|
|
Blocks until a connection is available.
|
|
:auto_login: automatically logins before returning conn
|
|
:headers: header to pass on to login attempt
|
|
:param rid: request id passed in from request eventlet.
|
|
:returns: An available HTTPConnection instance or None if no
|
|
api_providers are configured.
|
|
'''
|
|
if not self._api_providers:
|
|
LOG.warn(_("[%d] no API providers currently available."), rid)
|
|
return None
|
|
if self._conn_pool.empty():
|
|
LOG.debug(_("[%d] Waiting to acquire API client connection."), rid)
|
|
priority, conn = self._conn_pool.get()
|
|
now = time.time()
|
|
if getattr(conn, 'last_used', now) < now - self.CONN_IDLE_TIMEOUT:
|
|
LOG.info(_("[%(rid)d] Connection %(conn)s idle for %(sec)0.2f "
|
|
"seconds; reconnecting."),
|
|
{'rid': rid, 'conn': _conn_str(conn),
|
|
'sec': now - conn.last_used})
|
|
conn = self._create_connection(*self._conn_params(conn))
|
|
|
|
conn.last_used = now
|
|
conn.priority = priority # stash current priority for release
|
|
qsize = self._conn_pool.qsize()
|
|
LOG.debug(_("[%(rid)d] Acquired connection %(conn)s. %(qsize)d "
|
|
"connection(s) available."),
|
|
{'rid': rid, 'conn': _conn_str(conn), 'qsize': qsize})
|
|
if auto_login and self.auth_cookie(conn) is None:
|
|
self._wait_for_login(conn, headers)
|
|
return conn
|
|
|
|
def release_connection(self, http_conn, bad_state=False,
|
|
service_unavail=False, rid=-1):
|
|
'''Mark HTTPConnection instance as available for check-out.
|
|
|
|
:param http_conn: An HTTPConnection instance obtained from this
|
|
instance.
|
|
:param bad_state: True if http_conn is known to be in a bad state
|
|
(e.g. connection fault.)
|
|
:service_unavail: True if http_conn returned 503 response.
|
|
:param rid: request id passed in from request eventlet.
|
|
'''
|
|
conn_params = self._conn_params(http_conn)
|
|
if self._conn_params(http_conn) not in self._api_providers:
|
|
LOG.debug(_("[%(rid)d] Released connection %(conn)s is not an "
|
|
"API provider for the cluster"),
|
|
{'rid': rid, 'conn': _conn_str(http_conn)})
|
|
return
|
|
elif hasattr(http_conn, "no_release"):
|
|
return
|
|
|
|
if bad_state:
|
|
# Reconnect to provider.
|
|
LOG.warn(_("[%(rid)d] Connection returned in bad state, "
|
|
"reconnecting to %(conn)s"),
|
|
{'rid': rid, 'conn': _conn_str(http_conn)})
|
|
http_conn = self._create_connection(*self._conn_params(http_conn))
|
|
priority = self._next_conn_priority
|
|
self._next_conn_priority += 1
|
|
elif service_unavail:
|
|
# http_conn returned a service unaviable response, put other
|
|
# connections to the same controller at end of priority queue,
|
|
conns = []
|
|
while not self._conn_pool.empty():
|
|
priority, conn = self._conn_pool.get()
|
|
if self._conn_params(conn) == conn_params:
|
|
priority = self._next_conn_priority
|
|
self._next_conn_priority += 1
|
|
conns.append((priority, conn))
|
|
for priority, conn in conns:
|
|
self._conn_pool.put((priority, conn))
|
|
# put http_conn at end of queue also
|
|
priority = self._next_conn_priority
|
|
self._next_conn_priority += 1
|
|
else:
|
|
priority = http_conn.priority
|
|
|
|
self._conn_pool.put((priority, http_conn))
|
|
LOG.debug(_("[%(rid)d] Released connection %(conn)s. %(qsize)d "
|
|
"connection(s) available."),
|
|
{'rid': rid, 'conn': _conn_str(http_conn),
|
|
'qsize': self._conn_pool.qsize()})
|
|
|
|
def _wait_for_login(self, conn, headers=None):
|
|
'''Block until a login has occurred for the current API provider.'''
|
|
|
|
data = self._get_provider_data(conn)
|
|
if data is None:
|
|
LOG.error(_("Login request for an invalid connection: '%s'"),
|
|
_conn_str(conn))
|
|
return
|
|
provider_sem = data[0]
|
|
if provider_sem.acquire(blocking=False):
|
|
try:
|
|
cookie = self._login(conn, headers)
|
|
self.set_auth_cookie(conn, cookie)
|
|
finally:
|
|
provider_sem.release()
|
|
else:
|
|
LOG.debug(_("Waiting for auth to complete"))
|
|
# Wait until we can aquire then release
|
|
provider_sem.acquire(blocking=True)
|
|
provider_sem.release()
|
|
|
|
def _get_provider_data(self, conn_or_conn_params, default=None):
|
|
"""Get data for specified API provider.
|
|
|
|
Args:
|
|
conn_or_conn_params: either a HTTP(S)Connection object or the
|
|
resolved conn_params tuple returned by self._conn_params().
|
|
default: conn_params if ones passed aren't known
|
|
Returns: Data associated with specified provider
|
|
"""
|
|
conn_params = self._normalize_conn_params(conn_or_conn_params)
|
|
return self._api_provider_data.get(conn_params, default)
|
|
|
|
def _set_provider_data(self, conn_or_conn_params, data):
|
|
"""Set data for specified API provider.
|
|
|
|
Args:
|
|
conn_or_conn_params: either a HTTP(S)Connection object or the
|
|
resolved conn_params tuple returned by self._conn_params().
|
|
data: data to associate with API provider
|
|
"""
|
|
conn_params = self._normalize_conn_params(conn_or_conn_params)
|
|
if data is None:
|
|
del self._api_provider_data[conn_params]
|
|
else:
|
|
self._api_provider_data[conn_params] = data
|
|
|
|
def _normalize_conn_params(self, conn_or_conn_params):
|
|
"""Normalize conn_param tuple.
|
|
|
|
Args:
|
|
conn_or_conn_params: either a HTTP(S)Connection object or the
|
|
resolved conn_params tuple returned by self._conn_params().
|
|
|
|
Returns: Normalized conn_param tuple
|
|
"""
|
|
if (not isinstance(conn_or_conn_params, tuple) and
|
|
not isinstance(conn_or_conn_params, httplib.HTTPConnection)):
|
|
LOG.debug(_("Invalid conn_params value: '%s'"),
|
|
str(conn_or_conn_params))
|
|
return conn_or_conn_params
|
|
if isinstance(conn_or_conn_params, httplib.HTTPConnection):
|
|
conn_params = self._conn_params(conn_or_conn_params)
|
|
else:
|
|
conn_params = conn_or_conn_params
|
|
host, port, is_ssl = conn_params
|
|
if port is None:
|
|
port = 443 if is_ssl else 80
|
|
return (host, port, is_ssl)
|
|
|
|
def update_providers(self, api_providers):
|
|
new_providers = set([tuple(p) for p in api_providers])
|
|
if new_providers != self._api_providers:
|
|
new_conns = []
|
|
while not self._conn_pool.empty():
|
|
priority, conn = self._conn_pool.get_nowait()
|
|
if self._conn_params(conn) in new_providers:
|
|
new_conns.append((priority, conn))
|
|
|
|
to_subtract = self._api_providers - new_providers
|
|
for p in to_subtract:
|
|
self._set_provider_data(p, None)
|
|
to_add = new_providers - self._api_providers
|
|
for unused_i in range(self._concurrent_connections):
|
|
for host, port, is_ssl in to_add:
|
|
conn = self._create_connection(host, port, is_ssl)
|
|
new_conns.append((self._next_conn_priority, conn))
|
|
self._next_conn_priority += 1
|
|
|
|
for priority, conn in new_conns:
|
|
self._conn_pool.put((priority, conn))
|
|
self._api_providers = new_providers
|