Allow nvp_api to load balance requests
The current version of the nvp_api client does not load balance requests across multiple controllers. Instead, it just sends all the requests to one controller and if there is a controller failure it will failover to use another controller. This blueprint implements the ablility to utilize all controllers at once. blueprint nvp-api-client-loadbalance-request Change-Id: I331be2a23ae360a95786152d5f116359f690d9f3
This commit is contained in:
parent
1d2784651b
commit
300486546b
@ -36,8 +36,8 @@ class NVPApiHelper(client_eventlet.NvpApiClientEventlet):
|
|||||||
'''
|
'''
|
||||||
|
|
||||||
def __init__(self, api_providers, user, password, request_timeout,
|
def __init__(self, api_providers, user, password, request_timeout,
|
||||||
http_timeout, retries, redirects, failover_time,
|
http_timeout, retries, redirects,
|
||||||
concurrent_connections=3):
|
concurrent_connections=3, nvp_gen_timeout=-1):
|
||||||
'''Constructor.
|
'''Constructor.
|
||||||
|
|
||||||
:param api_providers: a list of tuples in the form:
|
:param api_providers: a list of tuples in the form:
|
||||||
@ -53,12 +53,10 @@ class NVPApiHelper(client_eventlet.NvpApiClientEventlet):
|
|||||||
controller in the cluster)
|
controller in the cluster)
|
||||||
:param retries: the number of concurrent connections.
|
:param retries: the number of concurrent connections.
|
||||||
:param redirects: the number of concurrent connections.
|
:param redirects: the number of concurrent connections.
|
||||||
:param failover_time: minimum time between controller failover and new
|
|
||||||
connections allowed.
|
|
||||||
'''
|
'''
|
||||||
client_eventlet.NvpApiClientEventlet.__init__(
|
client_eventlet.NvpApiClientEventlet.__init__(
|
||||||
self, api_providers, user, password, concurrent_connections,
|
self, api_providers, user, password, concurrent_connections,
|
||||||
failover_time=failover_time)
|
nvp_gen_timeout)
|
||||||
|
|
||||||
self._request_timeout = request_timeout
|
self._request_timeout = request_timeout
|
||||||
self._http_timeout = http_timeout
|
self._http_timeout = http_timeout
|
||||||
@ -84,7 +82,7 @@ class NVPApiHelper(client_eventlet.NvpApiClientEventlet):
|
|||||||
if password:
|
if password:
|
||||||
self._password = password
|
self._password = password
|
||||||
|
|
||||||
return client_eventlet.NvpApiClientEventlet.login(self)
|
return client_eventlet.NvpApiClientEventlet._login(self)
|
||||||
|
|
||||||
def request(self, method, url, body="", content_type="application/json"):
|
def request(self, method, url, body="", content_type="application/json"):
|
||||||
'''Issues request to controller.'''
|
'''Issues request to controller.'''
|
||||||
|
@ -27,7 +27,6 @@ import webob.exc
|
|||||||
|
|
||||||
# FIXME(salvatore-orlando): get rid of relative imports
|
# FIXME(salvatore-orlando): get rid of relative imports
|
||||||
from common import config
|
from common import config
|
||||||
from quantum.plugins.nicira.nicira_nvp_plugin.api_client import client_eventlet
|
|
||||||
from nvp_plugin_version import PLUGIN_VERSION
|
from nvp_plugin_version import PLUGIN_VERSION
|
||||||
|
|
||||||
from quantum.plugins.nicira.nicira_nvp_plugin import nicira_models
|
from quantum.plugins.nicira.nicira_nvp_plugin import nicira_models
|
||||||
@ -157,12 +156,11 @@ class NvpPluginV2(db_base_plugin_v2.QuantumDbPluginV2):
|
|||||||
http_timeout=cluster.http_timeout,
|
http_timeout=cluster.http_timeout,
|
||||||
retries=cluster.retries,
|
retries=cluster.retries,
|
||||||
redirects=cluster.redirects,
|
redirects=cluster.redirects,
|
||||||
failover_time=self.nvp_opts.failover_time,
|
concurrent_connections=self.nvp_opts['concurrent_connections'],
|
||||||
concurrent_connections=self.nvp_opts.concurrent_connections)
|
nvp_gen_timeout=self.nvp_opts['nvp_gen_timeout'])
|
||||||
|
|
||||||
# TODO(salvatore-orlando): do login at first request,
|
if len(self.clusters) == 0:
|
||||||
# not when plugin is instantiated
|
first_cluster = cluster
|
||||||
cluster.api_client.login()
|
|
||||||
self.clusters[c_opts['name']] = cluster
|
self.clusters[c_opts['name']] = cluster
|
||||||
|
|
||||||
def_cluster_name = self.nvp_opts.default_cluster_name
|
def_cluster_name = self.nvp_opts.default_cluster_name
|
||||||
|
@ -21,8 +21,8 @@ NVP Plugin configuration
|
|||||||
bridged transport zone (default 64)
|
bridged transport zone (default 64)
|
||||||
- concurrent_connections: Number of connects to each controller node
|
- concurrent_connections: Number of connects to each controller node
|
||||||
(default 3)
|
(default 3)
|
||||||
- failover_time: Time from when a connection pool is switched to another
|
- nvp_gen_timout: Number of seconds a generation id should be valid for
|
||||||
controller during failures.
|
(default -1 meaning do not time out)
|
||||||
3) NVP cluster
|
3) NVP cluster
|
||||||
The Quantum NVP plugin allow for configuring multiple clusters.
|
The Quantum NVP plugin allow for configuring multiple clusters.
|
||||||
Each cluster configuration section must be declared in the following way
|
Each cluster configuration section must be declared in the following way
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
# Copyright 2012 Nicira Networks, Inc.
|
# Copyright 2012 Nicira, Inc.
|
||||||
# All Rights Reserved
|
# All Rights Reserved
|
||||||
#
|
#
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
@ -14,3 +14,5 @@
|
|||||||
# under the License.
|
# under the License.
|
||||||
#
|
#
|
||||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||||
|
#
|
||||||
|
# @author: Aaron Rosen, Nicira Networks, Inc.
|
||||||
|
@ -1,25 +1,40 @@
|
|||||||
# Copyright 2009-2012 Nicira Networks, Inc.
|
# Copyright 2012 Nicira, Inc.
|
||||||
# All Rights Reserved
|
# All Rights Reserved
|
||||||
#
|
#
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
# not use this file except in compliance with the License. You may obtain
|
# not use this file except in compliance with the License. You may obtain
|
||||||
# a copy of the License at
|
# a copy of the License at
|
||||||
#
|
#
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
#
|
#
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
#
|
#
|
||||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||||
#
|
#
|
||||||
# Author: David Lapsley <dlapsley@nicira.com>, Nicira Networks, Inc.
|
# @author: David Lapsley <dlapsley@nicira.com>, Nicira Networks, Inc.
|
||||||
|
# @author: Aaron Rosen, Nicira Networks, Inc.
|
||||||
|
|
||||||
|
|
||||||
from abc import ABCMeta
|
from abc import ABCMeta
|
||||||
from abc import abstractmethod
|
import httplib
|
||||||
from abc import abstractproperty
|
import time
|
||||||
|
import logging
|
||||||
|
|
||||||
|
|
||||||
|
from quantum.plugins.nicira.nicira_nvp_plugin.api_client.common import (
|
||||||
|
_conn_str)
|
||||||
|
|
||||||
|
|
||||||
|
logging.basicConfig(level=logging.INFO)
|
||||||
|
LOG = logging.getLogger(__name__)
|
||||||
|
#Default parameters.
|
||||||
|
GENERATION_ID_TIMEOUT = -1
|
||||||
|
DEFAULT_CONCURRENT_CONNECTIONS = 3
|
||||||
|
DEFAULT_CONNECT_TIMEOUT = 5
|
||||||
|
|
||||||
|
|
||||||
class NvpApiClient(object):
|
class NvpApiClient(object):
|
||||||
@ -33,38 +48,230 @@ class NvpApiClient(object):
|
|||||||
|
|
||||||
CONN_IDLE_TIMEOUT = 60 * 15
|
CONN_IDLE_TIMEOUT = 60 * 15
|
||||||
|
|
||||||
@abstractmethod
|
def _create_connection(self, host, port, is_ssl):
|
||||||
def update_providers(self, api_providers):
|
if is_ssl:
|
||||||
pass
|
return httplib.HTTPSConnection(host, port,
|
||||||
|
timeout=self._connect_timeout)
|
||||||
|
return httplib.HTTPConnection(host, port,
|
||||||
|
timeout=self._connect_timeout)
|
||||||
|
|
||||||
@abstractproperty
|
@staticmethod
|
||||||
|
def _conn_params(http_conn):
|
||||||
|
is_ssl = isinstance(http_conn, httplib.HTTPSConnection)
|
||||||
|
return (http_conn.host, http_conn.port, is_ssl)
|
||||||
|
|
||||||
|
@property
|
||||||
def user(self):
|
def user(self):
|
||||||
pass
|
return self._user
|
||||||
|
|
||||||
@abstractproperty
|
@property
|
||||||
def password(self):
|
def password(self):
|
||||||
pass
|
return self._password
|
||||||
|
|
||||||
@abstractproperty
|
@property
|
||||||
def auth_cookie(self):
|
def nvp_config_gen(self):
|
||||||
pass
|
# If nvp_gen_timeout is not -1 then:
|
||||||
|
# Maintain a timestamp along with the generation ID. Hold onto the
|
||||||
|
# ID long enough to be useful and block on sequential requests but
|
||||||
|
# not long enough to persist when Onix db is cleared, which resets
|
||||||
|
# the generation ID, causing the DAL to block indefinitely with some
|
||||||
|
# number that's higher than the cluster's value.
|
||||||
|
if self._nvp_gen_timeout != -1:
|
||||||
|
ts = self._nvp_config_gen_ts
|
||||||
|
if ts is not None:
|
||||||
|
if (time.time() - ts) > self._nvp_gen_timeout:
|
||||||
|
return None
|
||||||
|
return self._nvp_config_gen
|
||||||
|
|
||||||
@abstractmethod
|
@nvp_config_gen.setter
|
||||||
def acquire_connection(self):
|
def nvp_config_gen(self, value):
|
||||||
pass
|
if self._nvp_config_gen != value:
|
||||||
|
if self._nvp_gen_timeout != -1:
|
||||||
|
self._nvp_config_gen_ts = time.time()
|
||||||
|
self._nvp_config_gen = value
|
||||||
|
|
||||||
@abstractmethod
|
def auth_cookie(self, conn):
|
||||||
def release_connection(self, http_conn, bad_state=False):
|
cookie = None
|
||||||
pass
|
data = self._get_provider_data(conn)
|
||||||
|
if data:
|
||||||
|
cookie = data[1]
|
||||||
|
return cookie
|
||||||
|
|
||||||
@abstractproperty
|
def set_auth_cookie(self, conn, cookie):
|
||||||
def need_login(self):
|
data = self._get_provider_data(conn)
|
||||||
pass
|
if data:
|
||||||
|
self._set_provider_data(conn, (data[0], cookie))
|
||||||
|
|
||||||
@abstractmethod
|
def acquire_connection(self, auto_login=True, headers=None, rid=-1):
|
||||||
def wait_for_login(self):
|
'''Check out an available HTTPConnection instance.
|
||||||
pass
|
|
||||||
|
|
||||||
@abstractmethod
|
Blocks until a connection is available.
|
||||||
def login(self):
|
:auto_login: automatically logins before returning conn
|
||||||
pass
|
:headers: header to pass on to login attempt
|
||||||
|
:param rid: request id passed in from request eventlet.
|
||||||
|
:returns: An available HTTPConnection instance or None if no
|
||||||
|
api_providers are configured.
|
||||||
|
'''
|
||||||
|
if not self._api_providers:
|
||||||
|
LOG.warn(_("[%d] no API providers currently available."), rid)
|
||||||
|
return None
|
||||||
|
if self._conn_pool.empty():
|
||||||
|
LOG.debug(_("[%d] Waiting to acquire API client connection."), rid)
|
||||||
|
priority, conn = self._conn_pool.get()
|
||||||
|
now = time.time()
|
||||||
|
if getattr(conn, 'last_used', now) < now - self.CONN_IDLE_TIMEOUT:
|
||||||
|
LOG.info(_("[%d] Connection %s idle for %0.2f seconds; "
|
||||||
|
"reconnecting."),
|
||||||
|
rid, _conn_str(conn), now - conn.last_used)
|
||||||
|
conn = self._create_connection(*self._conn_params(conn))
|
||||||
|
|
||||||
|
conn.last_used = now
|
||||||
|
conn.priority = priority # stash current priority for release
|
||||||
|
qsize = self._conn_pool.qsize()
|
||||||
|
LOG.debug(_("[%d] Acquired connection %s. %d connection(s) "
|
||||||
|
"available."), rid, _conn_str(conn), qsize)
|
||||||
|
if auto_login and self.auth_cookie(conn) is None:
|
||||||
|
self._wait_for_login(conn, headers)
|
||||||
|
return conn
|
||||||
|
|
||||||
|
def release_connection(self, http_conn, bad_state=False,
|
||||||
|
service_unavail=False, rid=-1):
|
||||||
|
'''Mark HTTPConnection instance as available for check-out.
|
||||||
|
|
||||||
|
:param http_conn: An HTTPConnection instance obtained from this
|
||||||
|
instance.
|
||||||
|
:param bad_state: True if http_conn is known to be in a bad state
|
||||||
|
(e.g. connection fault.)
|
||||||
|
:service_unavail: True if http_conn returned 503 response.
|
||||||
|
:param rid: request id passed in from request eventlet.
|
||||||
|
'''
|
||||||
|
conn_params = self._conn_params(http_conn)
|
||||||
|
if self._conn_params(http_conn) not in self._api_providers:
|
||||||
|
LOG.debug(_("[%d] Released connection '%s' is not an API provider "
|
||||||
|
"for the cluster"), rid, _conn_str(http_conn))
|
||||||
|
return
|
||||||
|
elif hasattr(http_conn, "no_release"):
|
||||||
|
return
|
||||||
|
|
||||||
|
if bad_state:
|
||||||
|
# Reconnect to provider.
|
||||||
|
LOG.warn(_("[%d] Connection returned in bad state, reconnecting "
|
||||||
|
"to %s"), rid, _conn_str(http_conn))
|
||||||
|
http_conn = self._create_connection(*self._conn_params(http_conn))
|
||||||
|
priority = self._next_conn_priority
|
||||||
|
self._next_conn_priority += 1
|
||||||
|
elif service_unavail:
|
||||||
|
# http_conn returned a service unaviable response, put other
|
||||||
|
# connections to the same controller at end of priority queue,
|
||||||
|
conns = []
|
||||||
|
while not self._conn_pool.empty():
|
||||||
|
priority, conn = self._conn_pool.get()
|
||||||
|
if self._conn_params(conn) == conn_params:
|
||||||
|
priority = self._next_conn_priority
|
||||||
|
self._next_conn_priority += 1
|
||||||
|
conns.append((priority, conn))
|
||||||
|
for priority, conn in conns:
|
||||||
|
self._conn_pool.put((priority, conn))
|
||||||
|
# put http_conn at end of queue also
|
||||||
|
priority = self._next_conn_priority
|
||||||
|
self._next_conn_priority += 1
|
||||||
|
else:
|
||||||
|
priority = http_conn.priority
|
||||||
|
|
||||||
|
self._conn_pool.put((priority, http_conn))
|
||||||
|
LOG.debug(_("[%d] Released connection %s. %d connection(s) "
|
||||||
|
"available."),
|
||||||
|
rid, _conn_str(http_conn), self._conn_pool.qsize())
|
||||||
|
|
||||||
|
def _wait_for_login(self, conn, headers=None):
|
||||||
|
'''Block until a login has occurred for the current API provider.'''
|
||||||
|
|
||||||
|
data = self._get_provider_data(conn)
|
||||||
|
if data is None:
|
||||||
|
LOG.error(_("Login request for an invalid connection: '%s'"),
|
||||||
|
_conn_str(conn))
|
||||||
|
return
|
||||||
|
provider_sem = data[0]
|
||||||
|
if provider_sem.acquire(blocking=False):
|
||||||
|
try:
|
||||||
|
cookie = self._login(conn, headers)
|
||||||
|
self.set_auth_cookie(conn, cookie)
|
||||||
|
finally:
|
||||||
|
provider_sem.release()
|
||||||
|
else:
|
||||||
|
LOG.debug(_("Waiting for auth to complete"))
|
||||||
|
# Wait until we can aquire then release
|
||||||
|
provider_sem.acquire(blocking=True)
|
||||||
|
provider_sem.release()
|
||||||
|
|
||||||
|
def _get_provider_data(self, conn_or_conn_params, default=None):
|
||||||
|
"""Get data for specified API provider.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
conn_or_conn_params: either a HTTP(S)Connection object or the
|
||||||
|
resolved conn_params tuple returned by self._conn_params().
|
||||||
|
default: conn_params if ones passed aren't known
|
||||||
|
Returns: Data associated with specified provider
|
||||||
|
"""
|
||||||
|
conn_params = self._normalize_conn_params(conn_or_conn_params)
|
||||||
|
return self._api_provider_data.get(conn_params, default)
|
||||||
|
|
||||||
|
def _set_provider_data(self, conn_or_conn_params, data):
|
||||||
|
"""Set data for specified API provider.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
conn_or_conn_params: either a HTTP(S)Connection object or the
|
||||||
|
resolved conn_params tuple returned by self._conn_params().
|
||||||
|
data: data to associate with API provider
|
||||||
|
"""
|
||||||
|
conn_params = self._normalize_conn_params(conn_or_conn_params)
|
||||||
|
if data is None:
|
||||||
|
del self._api_provider_data[conn_params]
|
||||||
|
else:
|
||||||
|
self._api_provider_data[conn_params] = data
|
||||||
|
|
||||||
|
def _normalize_conn_params(self, conn_or_conn_params):
|
||||||
|
"""Normalize conn_param tuple.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
conn_or_conn_params: either a HTTP(S)Connection object or the
|
||||||
|
resolved conn_params tuple returned by self._conn_params().
|
||||||
|
|
||||||
|
Returns: Normalized conn_param tuple
|
||||||
|
"""
|
||||||
|
if (not isinstance(conn_or_conn_params, tuple) and
|
||||||
|
not isinstance(conn_or_conn_params, httplib.HTTPConnection)):
|
||||||
|
LOG.debug(_("Invalid conn_params value: '%s'"),
|
||||||
|
str(conn_or_conn_params))
|
||||||
|
return conn_or_conn_params
|
||||||
|
if isinstance(conn_or_conn_params, httplib.HTTPConnection):
|
||||||
|
conn_params = self._conn_params(conn_or_conn_params)
|
||||||
|
else:
|
||||||
|
conn_params = conn_or_conn_params
|
||||||
|
host, port, is_ssl = conn_params
|
||||||
|
if port is None:
|
||||||
|
port = 443 if is_ssl else 80
|
||||||
|
return (host, port, is_ssl)
|
||||||
|
|
||||||
|
def update_providers(self, api_providers):
|
||||||
|
new_providers = set([tuple(p) for p in api_providers])
|
||||||
|
if new_providers != self._api_providers:
|
||||||
|
new_conns = []
|
||||||
|
while not self._conn_pool.empty():
|
||||||
|
priority, conn = self._conn_pool.get_nowait()
|
||||||
|
if self._conn_params(conn) in new_providers:
|
||||||
|
new_conns.append((priority, conn))
|
||||||
|
|
||||||
|
to_subtract = self._api_providers - new_providers
|
||||||
|
for p in to_subtract:
|
||||||
|
self._set_provider_data(p, None)
|
||||||
|
to_add = new_providers - self._api_providers
|
||||||
|
for unused_i in range(self._concurrent_connections):
|
||||||
|
for host, port, is_ssl in to_add:
|
||||||
|
conn = self._create_connection(host, port, is_ssl)
|
||||||
|
new_conns.append((self._next_conn_priority, conn))
|
||||||
|
self._next_conn_priority += 1
|
||||||
|
|
||||||
|
for priority, conn in new_conns:
|
||||||
|
self._conn_pool.put((priority, conn))
|
||||||
|
self._api_providers = new_providers
|
||||||
|
@ -1,53 +1,44 @@
|
|||||||
# Copyright 2009-2012 Nicira Networks, Inc.
|
# Copyright 2012 Nicira, Inc.
|
||||||
# All Rights Reserved
|
# All Rights Reserved
|
||||||
#
|
#
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
# not use this file except in compliance with the License. You may obtain
|
# not use this file except in compliance with the License. You may obtain
|
||||||
# a copy of the License at
|
# a copy of the License at
|
||||||
#
|
#
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
#
|
#
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
#
|
#
|
||||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||||
#
|
#
|
||||||
|
# @author: Aaron Rosen, Nicira Networks, Inc.
|
||||||
|
|
||||||
|
|
||||||
import client
|
|
||||||
import eventlet
|
import eventlet
|
||||||
import httplib
|
|
||||||
import logging
|
import logging
|
||||||
import request_eventlet
|
|
||||||
import time
|
import time
|
||||||
|
|
||||||
from common import _conn_str
|
from quantum.plugins.nicira.nicira_nvp_plugin.api_client import client
|
||||||
|
from quantum.plugins.nicira.nicira_nvp_plugin.api_client import (
|
||||||
|
request_eventlet)
|
||||||
|
|
||||||
eventlet.monkey_patch()
|
eventlet.monkey_patch()
|
||||||
|
|
||||||
logging.basicConfig(level=logging.INFO)
|
logging.basicConfig(level=logging.INFO)
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
# Default parameters.
|
|
||||||
DEFAULT_FAILOVER_TIME = 5
|
|
||||||
DEFAULT_CONCURRENT_CONNECTIONS = 3
|
|
||||||
DEFAULT_CONNECT_TIMEOUT = 5
|
|
||||||
GENERATION_ID_TIMEOUT = -1 # if set to -1 then disabled
|
|
||||||
|
|
||||||
|
class NvpApiClientEventlet(client.NvpApiClient):
|
||||||
class NvpApiClientEventlet(object):
|
|
||||||
'''Eventlet-based implementation of NvpApiClient ABC.'''
|
'''Eventlet-based implementation of NvpApiClient ABC.'''
|
||||||
|
|
||||||
CONN_IDLE_TIMEOUT = 60 * 15
|
|
||||||
|
|
||||||
def __init__(self, api_providers, user, password,
|
def __init__(self, api_providers, user, password,
|
||||||
concurrent_connections=DEFAULT_CONCURRENT_CONNECTIONS,
|
concurrent_connections=client.DEFAULT_CONCURRENT_CONNECTIONS,
|
||||||
use_https=True,
|
nvp_gen_timeout=client.GENERATION_ID_TIMEOUT, use_https=True,
|
||||||
connect_timeout=DEFAULT_CONNECT_TIMEOUT,
|
connect_timeout=client.DEFAULT_CONNECT_TIMEOUT):
|
||||||
failover_time=DEFAULT_FAILOVER_TIME,
|
|
||||||
nvp_gen_timeout=GENERATION_ID_TIMEOUT):
|
|
||||||
'''Constructor
|
'''Constructor
|
||||||
|
|
||||||
:param api_providers: a list of tuples of the form: (host, port,
|
:param api_providers: a list of tuples of the form: (host, port,
|
||||||
@ -57,242 +48,112 @@ class NvpApiClientEventlet(object):
|
|||||||
:param concurrent_connections: total number of concurrent connections.
|
:param concurrent_connections: total number of concurrent connections.
|
||||||
:param use_https: whether or not to use https for requests.
|
:param use_https: whether or not to use https for requests.
|
||||||
:param connect_timeout: connection timeout in seconds.
|
:param connect_timeout: connection timeout in seconds.
|
||||||
:param failover_time: time from when a connection pool is switched to
|
|
||||||
the next connection released via acquire_connection().
|
|
||||||
:param nvp_gen_timeout controls how long the generation id is kept
|
:param nvp_gen_timeout controls how long the generation id is kept
|
||||||
if set to -1 the generation id is never timed out
|
if set to -1 the generation id is never timed out
|
||||||
'''
|
'''
|
||||||
if not api_providers:
|
if not api_providers:
|
||||||
api_providers = []
|
api_providers = []
|
||||||
self._api_providers = set([tuple(p) for p in api_providers])
|
self._api_providers = set([tuple(p) for p in api_providers])
|
||||||
|
self._api_provider_data = {} # tuple(semaphore, nvp_session_cookie)
|
||||||
|
for p in self._api_providers:
|
||||||
|
self._set_provider_data(p, (eventlet.semaphore.Semaphore(1), None))
|
||||||
self._user = user
|
self._user = user
|
||||||
self._password = password
|
self._password = password
|
||||||
self._concurrent_connections = concurrent_connections
|
self._concurrent_connections = concurrent_connections
|
||||||
self._use_https = use_https
|
self._use_https = use_https
|
||||||
self._connect_timeout = connect_timeout
|
self._connect_timeout = connect_timeout
|
||||||
self._failover_time = failover_time
|
|
||||||
self._nvp_config_gen = None
|
self._nvp_config_gen = None
|
||||||
self._nvp_config_gen_ts = None
|
self._nvp_config_gen_ts = None
|
||||||
self._nvp_gen_timeout = nvp_gen_timeout
|
self._nvp_gen_timeout = nvp_gen_timeout
|
||||||
|
|
||||||
# Connection pool is a list of queues.
|
# Connection pool is a list of queues.
|
||||||
self._conn_pool = list()
|
self._conn_pool = eventlet.queue.PriorityQueue()
|
||||||
conn_pool_idx = 0
|
self._next_conn_priority = 1
|
||||||
for host, port, is_ssl in api_providers:
|
for host, port, is_ssl in api_providers:
|
||||||
provider_conn_pool = eventlet.queue.Queue(
|
|
||||||
maxsize=concurrent_connections)
|
|
||||||
for i in range(concurrent_connections):
|
for i in range(concurrent_connections):
|
||||||
# All connections in a provider_conn_poool have the
|
|
||||||
# same priority (they connect to the same server).
|
|
||||||
conn = self._create_connection(host, port, is_ssl)
|
conn = self._create_connection(host, port, is_ssl)
|
||||||
conn.idx = conn_pool_idx
|
self._conn_pool.put((self._next_conn_priority, conn))
|
||||||
provider_conn_pool.put(conn)
|
self._next_conn_priority += 1
|
||||||
|
|
||||||
self._conn_pool.append(provider_conn_pool)
|
def acquire_redirect_connection(self, conn_params, auto_login=True,
|
||||||
conn_pool_idx += 1
|
headers=None):
|
||||||
|
"""Check out or create connection to redirected NVP API server.
|
||||||
|
|
||||||
self._active_conn_pool_idx = 0
|
Args:
|
||||||
|
conn_params: tuple specifying target of redirect, see
|
||||||
|
self._conn_params()
|
||||||
|
auto_login: returned connection should have valid session cookie
|
||||||
|
headers: headers to pass on if auto_login
|
||||||
|
|
||||||
self._cookie = None
|
Returns: An available HTTPConnection instance corresponding to the
|
||||||
self._need_login = True
|
specified conn_params. If a connection did not previously
|
||||||
self._doing_login_sem = eventlet.semaphore.Semaphore(1)
|
exist, new connections are created with the highest prioity
|
||||||
|
in the connection pool and one of these new connections
|
||||||
|
returned.
|
||||||
|
"""
|
||||||
|
result_conn = None
|
||||||
|
data = self._get_provider_data(conn_params)
|
||||||
|
if data:
|
||||||
|
# redirect target already exists in provider data and connections
|
||||||
|
# to the provider have been added to the connection pool. Try to
|
||||||
|
# obtain a connection from the pool, note that it's possible that
|
||||||
|
# all connection to the provider are currently in use.
|
||||||
|
conns = []
|
||||||
|
while not self._conn_pool.empty():
|
||||||
|
priority, conn = self._conn_pool.get_nowait()
|
||||||
|
if not result_conn and self._conn_params(conn) == conn_params:
|
||||||
|
conn.priority = priority
|
||||||
|
result_conn = conn
|
||||||
|
else:
|
||||||
|
conns.append((priority, conn))
|
||||||
|
for priority, conn in conns:
|
||||||
|
self._conn_pool.put((priority, conn))
|
||||||
|
# hack: if no free connections available, create new connection
|
||||||
|
# and stash "no_release" attribute (so that we only exceed
|
||||||
|
# self._concurrent_connections temporarily)
|
||||||
|
if not result_conn:
|
||||||
|
conn = self._create_connection(*conn_params)
|
||||||
|
conn.priority = 0 # redirect connections ahve highest priority
|
||||||
|
conn.no_release = True
|
||||||
|
result_conn = conn
|
||||||
|
else:
|
||||||
|
#redirect target not already known, setup provider lists
|
||||||
|
self._api_providers.update([conn_params])
|
||||||
|
self._set_provider_data(conn_params,
|
||||||
|
(eventlet.semaphore.Semaphore(1), None))
|
||||||
|
# redirects occur during cluster upgrades, i.e. results to old
|
||||||
|
# redirects to new, so give redirect targets highest priority
|
||||||
|
priority = 0
|
||||||
|
for i in range(self._concurrent_connections):
|
||||||
|
conn = self._create_connection(*conn_params)
|
||||||
|
conn.priority = priority
|
||||||
|
if i == self._concurrent_connections - 1:
|
||||||
|
break
|
||||||
|
self._conn_pool.put((priority, conn))
|
||||||
|
result_conn = conn
|
||||||
|
if result_conn:
|
||||||
|
result_conn.last_used = time.time()
|
||||||
|
if auto_login and self.auth_cookie(conn) is None:
|
||||||
|
self._wait_for_login(result_conn, headers)
|
||||||
|
return result_conn
|
||||||
|
|
||||||
def _create_connection(self, host, port, is_ssl):
|
def _login(self, conn=None, headers=None):
|
||||||
if is_ssl:
|
|
||||||
return httplib.HTTPSConnection(host, port,
|
|
||||||
timeout=self._connect_timeout)
|
|
||||||
return httplib.HTTPConnection(host, port,
|
|
||||||
timeout=self._connect_timeout)
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def _conn_params(http_conn):
|
|
||||||
is_ssl = isinstance(http_conn, httplib.HTTPSConnection)
|
|
||||||
return (http_conn.host, http_conn.port, is_ssl)
|
|
||||||
|
|
||||||
def update_providers(self, api_providers):
|
|
||||||
raise Exception(_('update_providers() not implemented.'))
|
|
||||||
|
|
||||||
@property
|
|
||||||
def user(self):
|
|
||||||
return self._user
|
|
||||||
|
|
||||||
@property
|
|
||||||
def password(self):
|
|
||||||
return self._password
|
|
||||||
|
|
||||||
@property
|
|
||||||
def nvp_config_gen(self):
|
|
||||||
# If nvp_gen_timeout is not -1 then:
|
|
||||||
# Maintain a timestamp along with the generation ID. Hold onto the
|
|
||||||
# ID long enough to be useful and block on sequential requests but
|
|
||||||
# not long enough to persist when Onix db is cleared, which resets
|
|
||||||
# the generation ID, causing the DAL to block indefinitely with some
|
|
||||||
# number that's higher than the cluster's value.
|
|
||||||
if self._nvp_gen_timeout != -1:
|
|
||||||
ts = self._nvp_config_gen_ts
|
|
||||||
if ts is not None:
|
|
||||||
if (time.time() - ts) > self._nvp_gen_timeout:
|
|
||||||
return None
|
|
||||||
return self._nvp_config_gen
|
|
||||||
|
|
||||||
@nvp_config_gen.setter
|
|
||||||
def nvp_config_gen(self, value):
|
|
||||||
if self._nvp_config_gen != value:
|
|
||||||
if self._nvp_gen_timeout != -1:
|
|
||||||
self._nvp_config_gen_ts = time.time()
|
|
||||||
self._nvp_config_gen = value
|
|
||||||
|
|
||||||
@property
|
|
||||||
def auth_cookie(self):
|
|
||||||
return self._cookie
|
|
||||||
|
|
||||||
def acquire_connection(self, rid=-1):
|
|
||||||
'''Check out an available HTTPConnection instance.
|
|
||||||
|
|
||||||
Blocks until a connection is available.
|
|
||||||
|
|
||||||
:param rid: request id passed in from request eventlet.
|
|
||||||
:returns: An available HTTPConnection instance or None if no
|
|
||||||
api_providers are configured.
|
|
||||||
'''
|
|
||||||
if not self._api_providers:
|
|
||||||
LOG.warn(_("[%d] no API providers currently available."), rid)
|
|
||||||
return None
|
|
||||||
|
|
||||||
# The sleep time is to give controllers time to become consistent after
|
|
||||||
# there has been a change in the controller used as the api_provider.
|
|
||||||
now = time.time()
|
|
||||||
if now < getattr(self, '_issue_conn_barrier', now):
|
|
||||||
LOG.warn(_("[%d] Waiting for failover timer to expire."), rid)
|
|
||||||
time.sleep(self._issue_conn_barrier - now)
|
|
||||||
|
|
||||||
# Print out a warning if all connections are in use.
|
|
||||||
if self._conn_pool[self._active_conn_pool_idx].empty():
|
|
||||||
LOG.debug(_("[%d] Waiting to acquire client connection."), rid)
|
|
||||||
|
|
||||||
# Try to acquire a connection (block in get() until connection
|
|
||||||
# available or timeout occurs).
|
|
||||||
active_conn_pool_idx = self._active_conn_pool_idx
|
|
||||||
conn = self._conn_pool[active_conn_pool_idx].get()
|
|
||||||
|
|
||||||
if active_conn_pool_idx != self._active_conn_pool_idx:
|
|
||||||
# active_conn_pool became inactive while we were waiting.
|
|
||||||
# Put connection back on old pool and try again.
|
|
||||||
LOG.warn(_("[%(rid)d] Active pool expired while waiting for "
|
|
||||||
"connection: %(conn)s"),
|
|
||||||
{'rid': rid, 'conn': _conn_str(conn)})
|
|
||||||
self._conn_pool[active_conn_pool_idx].put(conn)
|
|
||||||
return self.acquire_connection(rid=rid)
|
|
||||||
|
|
||||||
# Check if the connection has been idle too long.
|
|
||||||
now = time.time()
|
|
||||||
if getattr(conn, 'last_used', now) < now - self.CONN_IDLE_TIMEOUT:
|
|
||||||
LOG.info(_("[%(rid)d] Connection %(conn)s idle for %(sec)0.2f "
|
|
||||||
"seconds; reconnecting."),
|
|
||||||
{'rid': rid, 'conn': _conn_str(conn),
|
|
||||||
'sec': now - conn.last_used})
|
|
||||||
conn = self._create_connection(*self._conn_params(conn))
|
|
||||||
|
|
||||||
# Stash conn pool so conn knows where to go when it releases.
|
|
||||||
conn.idx = self._active_conn_pool_idx
|
|
||||||
|
|
||||||
conn.last_used = now
|
|
||||||
qsize = self._conn_pool[self._active_conn_pool_idx].qsize()
|
|
||||||
LOG.debug(_("[%(rid)d] Acquired connection %(conn)s. %(qsize)d "
|
|
||||||
"connection(s) available."),
|
|
||||||
{'rid': rid, 'conn': _conn_str(conn), 'qsize': qsize})
|
|
||||||
return conn
|
|
||||||
|
|
||||||
def release_connection(self, http_conn, bad_state=False, rid=-1):
|
|
||||||
'''Mark HTTPConnection instance as available for check-out.
|
|
||||||
|
|
||||||
:param http_conn: An HTTPConnection instance obtained from this
|
|
||||||
instance.
|
|
||||||
:param bad_state: True if http_conn is known to be in a bad state
|
|
||||||
(e.g. connection fault.)
|
|
||||||
:param rid: request id passed in from request eventlet.
|
|
||||||
'''
|
|
||||||
if self._conn_params(http_conn) not in self._api_providers:
|
|
||||||
LOG.warn(_("[%(rid)d] Released connection '%(conn)s' is not an "
|
|
||||||
"API provider for the cluster"),
|
|
||||||
{'rid': rid, 'conn': _conn_str(http_conn)})
|
|
||||||
return
|
|
||||||
|
|
||||||
# Retrieve "home" connection pool.
|
|
||||||
conn_pool_idx = http_conn.idx
|
|
||||||
conn_pool = self._conn_pool[conn_pool_idx]
|
|
||||||
if bad_state:
|
|
||||||
# Reconnect to provider.
|
|
||||||
LOG.warn(_("[%(rid)d] Connection returned in bad state, "
|
|
||||||
"reconnecting to %(conn)s"),
|
|
||||||
{'rid': rid, 'conn': _conn_str(http_conn)})
|
|
||||||
http_conn = self._create_connection(*self._conn_params(http_conn))
|
|
||||||
http_conn.idx = conn_pool_idx
|
|
||||||
|
|
||||||
if self._active_conn_pool_idx == http_conn.idx:
|
|
||||||
# This pool is no longer in a good state. Switch to next pool.
|
|
||||||
self._active_conn_pool_idx += 1
|
|
||||||
self._active_conn_pool_idx %= len(self._conn_pool)
|
|
||||||
LOG.warn(_("[%(rid)d] Switched active_conn_pool from "
|
|
||||||
"%(idx)d to %(pool_idx)d."),
|
|
||||||
{'rid': rid, 'idx': http_conn.idx,
|
|
||||||
'pool_idx': self._active_conn_pool_idx})
|
|
||||||
|
|
||||||
# No connections to the new provider allowed until after this
|
|
||||||
# timer has expired (allow time for synchronization).
|
|
||||||
self._issue_conn_barrier = time.time() + self._failover_time
|
|
||||||
|
|
||||||
conn_pool.put(http_conn)
|
|
||||||
LOG.debug(_("[%(rid)d] Released connection %(conn)s. "
|
|
||||||
"%(qsize)d connection(s) available."),
|
|
||||||
{'rid': rid, 'conn': _conn_str(http_conn),
|
|
||||||
'qsize': conn_pool.qsize()})
|
|
||||||
|
|
||||||
@property
|
|
||||||
def need_login(self):
|
|
||||||
return self._need_login
|
|
||||||
|
|
||||||
@need_login.setter
|
|
||||||
def need_login(self, val=True):
|
|
||||||
self._need_login = val
|
|
||||||
|
|
||||||
def wait_for_login(self):
|
|
||||||
'''Block until a login has occurred for the current API provider.'''
|
|
||||||
if self._need_login:
|
|
||||||
if self._doing_login_sem.acquire(blocking=False):
|
|
||||||
self.login()
|
|
||||||
self._doing_login_sem.release()
|
|
||||||
else:
|
|
||||||
LOG.debug(_("Waiting for auth to complete"))
|
|
||||||
self._doing_login_sem.acquire()
|
|
||||||
self._doing_login_sem.release()
|
|
||||||
return self._cookie
|
|
||||||
|
|
||||||
def login(self):
|
|
||||||
'''Issue login request and update authentication cookie.'''
|
'''Issue login request and update authentication cookie.'''
|
||||||
|
cookie = None
|
||||||
g = request_eventlet.NvpLoginRequestEventlet(
|
g = request_eventlet.NvpLoginRequestEventlet(
|
||||||
self, self._user, self._password)
|
self, self._user, self._password, conn, headers)
|
||||||
g.start()
|
g.start()
|
||||||
ret = g.join()
|
ret = g.join()
|
||||||
|
|
||||||
if ret:
|
if ret:
|
||||||
if isinstance(ret, Exception):
|
if isinstance(ret, Exception):
|
||||||
LOG.error(_('NvpApiClient: login error "%s"'), ret)
|
LOG.error(_('NvpApiClient: login error "%s"'), ret)
|
||||||
raise ret
|
raise ret
|
||||||
|
|
||||||
self._cookie = None
|
|
||||||
cookie = ret.getheader("Set-Cookie")
|
cookie = ret.getheader("Set-Cookie")
|
||||||
if cookie:
|
if cookie:
|
||||||
LOG.debug(_("Saving new authentication cookie '%s'"), cookie)
|
LOG.debug(_("Saving new authentication cookie '%s'"), cookie)
|
||||||
self._cookie = cookie
|
|
||||||
self._need_login = False
|
|
||||||
|
|
||||||
# TODO: or ret is an error.
|
|
||||||
if not ret:
|
|
||||||
return None
|
|
||||||
|
|
||||||
return self._cookie
|
|
||||||
|
|
||||||
|
return cookie
|
||||||
|
|
||||||
# Register as subclass.
|
# Register as subclass.
|
||||||
client.NvpApiClient.register(NvpApiClientEventlet)
|
client.NvpApiClient.register(NvpApiClientEventlet)
|
||||||
|
@ -1,23 +1,24 @@
|
|||||||
# Copyright 2009-2012 Nicira Networks, Inc.
|
# Copyright 2012 Nicira, Inc.
|
||||||
# All Rights Reserved
|
# All Rights Reserved
|
||||||
#
|
#
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
# not use this file except in compliance with the License. You may obtain
|
# not use this file except in compliance with the License. You may obtain
|
||||||
# a copy of the License at
|
# a copy of the License at
|
||||||
#
|
#
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
#
|
#
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
#
|
#
|
||||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||||
#
|
#
|
||||||
|
# @author: Aaron Rosen, Nicira Networks, Inc.
|
||||||
|
|
||||||
|
|
||||||
import httplib
|
import httplib
|
||||||
import mock
|
|
||||||
|
|
||||||
|
|
||||||
def _conn_str(conn):
|
def _conn_str(conn):
|
||||||
@ -25,8 +26,6 @@ def _conn_str(conn):
|
|||||||
proto = "https://"
|
proto = "https://"
|
||||||
elif isinstance(conn, httplib.HTTPConnection):
|
elif isinstance(conn, httplib.HTTPConnection):
|
||||||
proto = "http://"
|
proto = "http://"
|
||||||
elif isinstance(conn, mock.Mock):
|
|
||||||
proto = "http://"
|
|
||||||
else:
|
else:
|
||||||
raise TypeError(_('_conn_str() invalid connection type: %s') %
|
raise TypeError(_('_conn_str() invalid connection type: %s') %
|
||||||
type(conn))
|
type(conn))
|
||||||
|
@ -1,27 +1,49 @@
|
|||||||
# Copyright 2009-2012 Nicira Networks, Inc.
|
# Copyright 2012 Nicira, Inc.
|
||||||
# All Rights Reserved
|
# All Rights Reserved
|
||||||
#
|
#
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
# not use this file except in compliance with the License. You may obtain
|
# not use this file except in compliance with the License. You may obtain
|
||||||
# a copy of the License at
|
# a copy of the License at
|
||||||
#
|
#
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
#
|
#
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
#
|
#
|
||||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||||
#
|
#
|
||||||
|
# @author: Aaron Rosen, Nicira Networks, Inc.
|
||||||
|
|
||||||
|
|
||||||
from abc import ABCMeta
|
from abc import ABCMeta
|
||||||
from abc import abstractmethod
|
from abc import abstractmethod
|
||||||
from abc import abstractproperty
|
import copy
|
||||||
|
import httplib
|
||||||
|
import logging
|
||||||
|
import time
|
||||||
|
import urlparse
|
||||||
|
|
||||||
|
from quantum.plugins.nicira.nicira_nvp_plugin.api_client.common import (
|
||||||
|
_conn_str)
|
||||||
|
|
||||||
|
logging.basicConfig(level=logging.INFO)
|
||||||
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
# Default parameters.
|
||||||
|
DEFAULT_REQUEST_TIMEOUT = 30
|
||||||
|
DEFAULT_HTTP_TIMEOUT = 10
|
||||||
|
DEFAULT_RETRIES = 2
|
||||||
|
DEFAULT_REDIRECTS = 2
|
||||||
|
DEFAULT_API_REQUEST_POOL_SIZE = 1000
|
||||||
|
DEFAULT_MAXIMUM_REQUEST_ID = 4294967295
|
||||||
|
DOWNLOAD_TIMEOUT = 180 # The UI code has a coorespoind 190 sec timeout
|
||||||
|
# for downloads, see: django/nvp_console/views.py
|
||||||
|
|
||||||
|
|
||||||
class NvpApiRequest:
|
class NvpApiRequest(object):
|
||||||
'''An abstract baseclass for all ApiRequest implementations.
|
'''An abstract baseclass for all ApiRequest implementations.
|
||||||
|
|
||||||
This defines the interface and property structure for both eventlet and
|
This defines the interface and property structure for both eventlet and
|
||||||
@ -30,6 +52,22 @@ class NvpApiRequest:
|
|||||||
|
|
||||||
__metaclass__ = ABCMeta
|
__metaclass__ = ABCMeta
|
||||||
|
|
||||||
|
# List of allowed status codes.
|
||||||
|
ALLOWED_STATUS_CODES = [
|
||||||
|
httplib.OK,
|
||||||
|
httplib.CREATED,
|
||||||
|
httplib.NO_CONTENT,
|
||||||
|
httplib.MOVED_PERMANENTLY,
|
||||||
|
httplib.TEMPORARY_REDIRECT,
|
||||||
|
httplib.BAD_REQUEST,
|
||||||
|
httplib.UNAUTHORIZED,
|
||||||
|
httplib.FORBIDDEN,
|
||||||
|
httplib.NOT_FOUND,
|
||||||
|
httplib.CONFLICT,
|
||||||
|
httplib.INTERNAL_SERVER_ERROR,
|
||||||
|
httplib.SERVICE_UNAVAILABLE
|
||||||
|
]
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def start(self):
|
def start(self):
|
||||||
pass
|
pass
|
||||||
@ -42,6 +80,200 @@ class NvpApiRequest:
|
|||||||
def copy(self):
|
def copy(self):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
@abstractproperty
|
def _issue_request(self):
|
||||||
|
'''Issue a request to a provider.'''
|
||||||
|
conn = (self._client_conn or
|
||||||
|
self._api_client.acquire_connection(True,
|
||||||
|
copy.copy(self._headers),
|
||||||
|
rid=self._rid()))
|
||||||
|
if conn is None:
|
||||||
|
error = Exception("No API connections available")
|
||||||
|
self._request_error = error
|
||||||
|
return error
|
||||||
|
|
||||||
|
url = self._url
|
||||||
|
LOG.debug(_("[%d] Issuing - request '%s'"),
|
||||||
|
self._rid(), self._request_str(conn, url))
|
||||||
|
issued_time = time.time()
|
||||||
|
is_conn_error = False
|
||||||
|
is_conn_service_unavail = False
|
||||||
|
try:
|
||||||
|
redirects = 0
|
||||||
|
while (redirects <= self._redirects):
|
||||||
|
# Update connection with user specified request timeout,
|
||||||
|
# the connect timeout is usually smaller so we only set
|
||||||
|
# the request timeout after a connection is established
|
||||||
|
if conn.sock is None:
|
||||||
|
conn.connect()
|
||||||
|
conn.sock.settimeout(self._http_timeout)
|
||||||
|
elif conn.sock.gettimeout() != self._http_timeout:
|
||||||
|
conn.sock.settimeout(self._http_timeout)
|
||||||
|
|
||||||
|
headers = copy.copy(self._headers)
|
||||||
|
cookie = self._api_client.auth_cookie(conn)
|
||||||
|
if cookie:
|
||||||
|
headers["Cookie"] = cookie
|
||||||
|
|
||||||
|
gen = self._api_client.nvp_config_gen
|
||||||
|
if gen:
|
||||||
|
headers["X-Nvp-Wait-For-Config-Generation"] = gen
|
||||||
|
LOG.debug(_("Setting %s request header: '%s'"),
|
||||||
|
'X-Nvp-Wait-For-Config-Generation', gen)
|
||||||
|
try:
|
||||||
|
conn.request(self._method, url, self._body, headers)
|
||||||
|
except Exception as e:
|
||||||
|
LOG.warn(_("[%d] Exception issuing request: '%s'"),
|
||||||
|
self._rid(), e)
|
||||||
|
raise e
|
||||||
|
|
||||||
|
response = conn.getresponse()
|
||||||
|
response.body = response.read()
|
||||||
|
response.headers = response.getheaders()
|
||||||
|
LOG.debug(_("[%d] Completed request '%s': %s (%0.2f seconds)"),
|
||||||
|
self._rid(), self._request_str(conn, url),
|
||||||
|
response.status, time.time() - issued_time)
|
||||||
|
|
||||||
|
new_gen = response.getheader('X-Nvp-Config-Generation', None)
|
||||||
|
if new_gen:
|
||||||
|
LOG.debug(_("Reading '%s' response header: '%s'"),
|
||||||
|
'X-Nvp-config-Generation', new_gen)
|
||||||
|
if (self._api_client.nvp_config_gen is None or
|
||||||
|
self._api_client.nvp_config_gen < int(new_gen)):
|
||||||
|
self._api_client.nvp_config_gen = int(new_gen)
|
||||||
|
|
||||||
|
if response.status == httplib.UNAUTHORIZED:
|
||||||
|
|
||||||
|
if cookie is None and self._url != "/ws.v1/login":
|
||||||
|
# The connection still has no valid cookie despite
|
||||||
|
# attemps to authenticate and the request has failed
|
||||||
|
# with unauthorized status code. If this isn't a
|
||||||
|
# a request to authenticate, we should abort the
|
||||||
|
# request since there is no point in retrying.
|
||||||
|
self._abort = True
|
||||||
|
else:
|
||||||
|
# If request is unauthorized, clear the session cookie
|
||||||
|
# for the current provider so that subsequent requests
|
||||||
|
# to the same provider triggers re-authentication.
|
||||||
|
self._api_client.set_auth_cookie(conn, None)
|
||||||
|
|
||||||
|
self._api_client.set_auth_cookie(conn, None)
|
||||||
|
elif response.status == httplib.SERVICE_UNAVAILABLE:
|
||||||
|
is_conn_service_unavail = True
|
||||||
|
|
||||||
|
if response.status not in [httplib.MOVED_PERMANENTLY,
|
||||||
|
httplib.TEMPORARY_REDIRECT]:
|
||||||
|
break
|
||||||
|
elif redirects >= self._redirects:
|
||||||
|
LOG.info(_("[%d] Maximum redirects exceeded, aborting "
|
||||||
|
"request"), self._rid())
|
||||||
|
break
|
||||||
|
redirects += 1
|
||||||
|
|
||||||
|
conn, url = self._redirect_params(conn, response.headers,
|
||||||
|
self._client_conn is None)
|
||||||
|
if url is None:
|
||||||
|
response.status = httplib.INTERNAL_SERVER_ERROR
|
||||||
|
break
|
||||||
|
LOG.info(_("[%d] Redirecting request to: '%s'"),
|
||||||
|
self._rid(), self._request_str(conn, url))
|
||||||
|
|
||||||
|
# If we receive any of these responses, then
|
||||||
|
# our server did not process our request and may be in an
|
||||||
|
# errored state. Raise an exception, which will cause the
|
||||||
|
# the conn to be released with is_conn_error == True
|
||||||
|
# which puts the conn on the back of the client's priority
|
||||||
|
# queue.
|
||||||
|
if response.status >= 500:
|
||||||
|
LOG.warn(_("[%d] Request '%s %s' received: %s"),
|
||||||
|
self._rid(), self._method, self._url,
|
||||||
|
response.status)
|
||||||
|
raise Exception('Server error return: %s' %
|
||||||
|
response.status)
|
||||||
|
return response
|
||||||
|
except Exception as e:
|
||||||
|
if isinstance(e, httplib.BadStatusLine):
|
||||||
|
msg = "Invalid server response"
|
||||||
|
else:
|
||||||
|
msg = unicode(e)
|
||||||
|
LOG.warn(_("[%d] Failed request '%s': '%s' (%0.2f seconds)"),
|
||||||
|
self._rid(), self._request_str(conn, url), msg,
|
||||||
|
time.time() - issued_time)
|
||||||
|
self._request_error = e
|
||||||
|
is_conn_error = True
|
||||||
|
return e
|
||||||
|
finally:
|
||||||
|
# Make sure we release the original connection provided by the
|
||||||
|
# acquire_connection() call above.
|
||||||
|
if self._client_conn is None:
|
||||||
|
self._api_client.release_connection(conn, is_conn_error,
|
||||||
|
is_conn_service_unavail,
|
||||||
|
rid=self._rid())
|
||||||
|
|
||||||
|
def _redirect_params(self, conn, headers, allow_release_conn=False):
|
||||||
|
"""Process redirect response, create new connection if necessary.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
conn: connection that returned the redirect response
|
||||||
|
headers: response headers of the redirect response
|
||||||
|
allow_release_conn: if redirecting to a different server,
|
||||||
|
release existing connection back to connection pool.
|
||||||
|
|
||||||
|
Returns: Return tuple(conn, url) where conn is a connection object
|
||||||
|
to the redirect target and url is the path of the API request
|
||||||
|
"""
|
||||||
|
|
||||||
|
url = None
|
||||||
|
for name, value in headers:
|
||||||
|
if name.lower() == "location":
|
||||||
|
url = value
|
||||||
|
break
|
||||||
|
if not url:
|
||||||
|
LOG.warn(_("[%d] Received redirect status without location header"
|
||||||
|
" field"), self._rid())
|
||||||
|
return (conn, None)
|
||||||
|
# Accept location with the following format:
|
||||||
|
# 1. /path, redirect to same node
|
||||||
|
# 2. scheme://hostname:[port]/path where scheme is https or http
|
||||||
|
# Reject others
|
||||||
|
# 3. e.g. relative paths, unsupported scheme, unspecified host
|
||||||
|
result = urlparse.urlparse(url)
|
||||||
|
if not result.scheme and not result.hostname and result.path:
|
||||||
|
if result.path[0] == "/":
|
||||||
|
if result.query:
|
||||||
|
url = "%s?%s" % (result.path, result.query)
|
||||||
|
else:
|
||||||
|
url = result.path
|
||||||
|
return (conn, url) # case 1
|
||||||
|
else:
|
||||||
|
LOG.warn(_("[%d] Received invalid redirect location: '%s'"),
|
||||||
|
self._rid(), url)
|
||||||
|
return (conn, None) # case 3
|
||||||
|
elif result.scheme not in ["http", "https"] or not result.hostname:
|
||||||
|
LOG.warn(_("[%d] Received malformed redirect location: %s"),
|
||||||
|
self._rid(), url)
|
||||||
|
return (conn, None) # case 3
|
||||||
|
# case 2, redirect location includes a scheme
|
||||||
|
# so setup a new connection and authenticate
|
||||||
|
if allow_release_conn:
|
||||||
|
self._api_client.release_connection(conn)
|
||||||
|
conn_params = (result.hostname, result.port, result.scheme == "https")
|
||||||
|
conn = self._api_client.acquire_redirect_connection(conn_params, True,
|
||||||
|
self._headers)
|
||||||
|
if result.query:
|
||||||
|
url = "%s?%s" % (result.path, result.query)
|
||||||
|
else:
|
||||||
|
url = result.path
|
||||||
|
return (conn, url)
|
||||||
|
|
||||||
|
def _rid(self):
|
||||||
|
'''Return current request id.'''
|
||||||
|
return self._request_id
|
||||||
|
|
||||||
|
@property
|
||||||
def request_error(self):
|
def request_error(self):
|
||||||
pass
|
'''Return any errors associated with this instance.'''
|
||||||
|
return self._request_error
|
||||||
|
|
||||||
|
def _request_str(self, conn, url):
|
||||||
|
'''Return string representation of connection.'''
|
||||||
|
return "%s %s/%s" % (self._method, _conn_str(conn), url)
|
||||||
|
@ -1,75 +1,46 @@
|
|||||||
# Copyright 2009-2012 Nicira Networks, Inc.
|
# Copyright 2012 Nicira, Inc.
|
||||||
# All Rights Reserved
|
# All Rights Reserved
|
||||||
#
|
#
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
# not use this file except in compliance with the License. You may obtain
|
# not use this file except in compliance with the License. You may obtain
|
||||||
# a copy of the License at
|
# a copy of the License at
|
||||||
#
|
#
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
#
|
#
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
#
|
#
|
||||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||||
#
|
#
|
||||||
|
# @author: Aaron Rosen, Nicira Networks, Inc.
|
||||||
|
|
||||||
|
|
||||||
import copy
|
|
||||||
import eventlet
|
import eventlet
|
||||||
import httplib
|
import httplib
|
||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
import request
|
|
||||||
import time
|
|
||||||
import urllib
|
import urllib
|
||||||
import urlparse
|
|
||||||
|
|
||||||
import client_eventlet
|
from quantum.plugins.nicira.nicira_nvp_plugin.api_client import request
|
||||||
from common import _conn_str
|
|
||||||
from eventlet import timeout
|
|
||||||
|
|
||||||
eventlet.monkey_patch()
|
eventlet.monkey_patch()
|
||||||
|
|
||||||
logging.basicConfig(level=logging.INFO)
|
logging.basicConfig(level=logging.INFO)
|
||||||
lg = logging.getLogger("nvp_api_request")
|
LOG = logging.getLogger(__name__)
|
||||||
USER_AGENT = "NVP eventlet client/1.0"
|
USER_AGENT = "NVP eventlet client/1.0"
|
||||||
|
|
||||||
# Default parameters.
|
|
||||||
DEFAULT_REQUEST_TIMEOUT = 30
|
|
||||||
DEFAULT_HTTP_TIMEOUT = 10
|
|
||||||
DEFAULT_RETRIES = 2
|
|
||||||
DEFAULT_REDIRECTS = 2
|
|
||||||
DEFAULT_API_REQUEST_POOL_SIZE = 1000
|
|
||||||
DEFAULT_MAXIMUM_REQUEST_ID = 4294967295
|
|
||||||
|
|
||||||
|
class NvpApiRequestEventlet(request.NvpApiRequest):
|
||||||
class NvpApiRequestEventlet:
|
|
||||||
'''Eventlet-based ApiRequest class.
|
'''Eventlet-based ApiRequest class.
|
||||||
|
|
||||||
This class will form the basis for eventlet-based ApiRequest classes
|
This class will form the basis for eventlet-based ApiRequest classes
|
||||||
(e.g. those used by the Quantum NVP Plugin).
|
(e.g. those used by the Quantum NVP Plugin).
|
||||||
'''
|
'''
|
||||||
|
|
||||||
# List of allowed status codes.
|
|
||||||
ALLOWED_STATUS_CODES = [
|
|
||||||
httplib.OK,
|
|
||||||
httplib.CREATED,
|
|
||||||
httplib.NO_CONTENT,
|
|
||||||
httplib.MOVED_PERMANENTLY,
|
|
||||||
httplib.TEMPORARY_REDIRECT,
|
|
||||||
httplib.BAD_REQUEST,
|
|
||||||
httplib.UNAUTHORIZED,
|
|
||||||
httplib.FORBIDDEN,
|
|
||||||
httplib.NOT_FOUND,
|
|
||||||
httplib.CONFLICT,
|
|
||||||
httplib.INTERNAL_SERVER_ERROR,
|
|
||||||
httplib.SERVICE_UNAVAILABLE
|
|
||||||
]
|
|
||||||
|
|
||||||
# Maximum number of green threads present in the system at one time.
|
# Maximum number of green threads present in the system at one time.
|
||||||
API_REQUEST_POOL_SIZE = DEFAULT_API_REQUEST_POOL_SIZE
|
API_REQUEST_POOL_SIZE = request.DEFAULT_API_REQUEST_POOL_SIZE
|
||||||
|
|
||||||
# Pool of green threads. One green thread is allocated per incoming
|
# Pool of green threads. One green thread is allocated per incoming
|
||||||
# request. Incoming requests will block when the pool is empty.
|
# request. Incoming requests will block when the pool is empty.
|
||||||
@ -77,18 +48,18 @@ class NvpApiRequestEventlet:
|
|||||||
|
|
||||||
# A unique id is assigned to each incoming request. When the current
|
# A unique id is assigned to each incoming request. When the current
|
||||||
# request id reaches MAXIMUM_REQUEST_ID it wraps around back to 0.
|
# request id reaches MAXIMUM_REQUEST_ID it wraps around back to 0.
|
||||||
MAXIMUM_REQUEST_ID = DEFAULT_MAXIMUM_REQUEST_ID
|
MAXIMUM_REQUEST_ID = request.DEFAULT_MAXIMUM_REQUEST_ID
|
||||||
|
|
||||||
# The request id for the next incoming request.
|
# The request id for the next incoming request.
|
||||||
CURRENT_REQUEST_ID = 0
|
CURRENT_REQUEST_ID = 0
|
||||||
|
|
||||||
def __init__(self, nvp_api_client, url, method="GET", body=None,
|
def __init__(self, nvp_api_client, url, method="GET", body=None,
|
||||||
headers=None,
|
headers=None,
|
||||||
request_timeout=DEFAULT_REQUEST_TIMEOUT,
|
request_timeout=request.DEFAULT_REQUEST_TIMEOUT,
|
||||||
retries=DEFAULT_RETRIES,
|
retries=request.DEFAULT_RETRIES,
|
||||||
auto_login=True,
|
auto_login=True,
|
||||||
redirects=DEFAULT_REDIRECTS,
|
redirects=request.DEFAULT_REDIRECTS,
|
||||||
http_timeout=DEFAULT_HTTP_TIMEOUT):
|
http_timeout=request.DEFAULT_HTTP_TIMEOUT, client_conn=None):
|
||||||
'''Constructor.'''
|
'''Constructor.'''
|
||||||
self._api_client = nvp_api_client
|
self._api_client = nvp_api_client
|
||||||
self._url = url
|
self._url = url
|
||||||
@ -100,6 +71,8 @@ class NvpApiRequestEventlet:
|
|||||||
self._auto_login = auto_login
|
self._auto_login = auto_login
|
||||||
self._redirects = redirects
|
self._redirects = redirects
|
||||||
self._http_timeout = http_timeout
|
self._http_timeout = http_timeout
|
||||||
|
self._client_conn = client_conn
|
||||||
|
self._abort = False
|
||||||
|
|
||||||
self._request_error = None
|
self._request_error = None
|
||||||
|
|
||||||
@ -126,10 +99,6 @@ class NvpApiRequestEventlet:
|
|||||||
'''Spawn a new green thread with the supplied function and args.'''
|
'''Spawn a new green thread with the supplied function and args.'''
|
||||||
return self.__class__._spawn(func, *args, **kwargs)
|
return self.__class__._spawn(func, *args, **kwargs)
|
||||||
|
|
||||||
def _rid(self):
|
|
||||||
'''Return current request id.'''
|
|
||||||
return self._request_id
|
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def joinall(cls):
|
def joinall(cls):
|
||||||
'''Wait for all outstanding requests to complete.'''
|
'''Wait for all outstanding requests to complete.'''
|
||||||
@ -152,197 +121,19 @@ class NvpApiRequestEventlet:
|
|||||||
self._headers, self._request_timeout, self._retries,
|
self._headers, self._request_timeout, self._retries,
|
||||||
self._auto_login, self._redirects, self._http_timeout)
|
self._auto_login, self._redirects, self._http_timeout)
|
||||||
|
|
||||||
@property
|
|
||||||
def request_error(self):
|
|
||||||
'''Return any errors associated with this instance.'''
|
|
||||||
return self._request_error
|
|
||||||
|
|
||||||
def _run(self):
|
def _run(self):
|
||||||
'''Method executed within green thread.'''
|
'''Method executed within green thread.'''
|
||||||
if self._request_timeout:
|
if self._request_timeout:
|
||||||
# No timeout exception escapes the with block.
|
# No timeout exception escapes the with block.
|
||||||
with timeout.Timeout(self._request_timeout, False):
|
with eventlet.timeout.Timeout(self._request_timeout, False):
|
||||||
return self._handle_request()
|
return self._handle_request()
|
||||||
|
|
||||||
lg.info(_('[%d] Request timeout.'), self._rid())
|
LOG.info(_('[%d] Request timeout.'), self._rid())
|
||||||
self._request_error = Exception(_('Request timeout'))
|
self._request_error = Exception(_('Request timeout'))
|
||||||
return None
|
return None
|
||||||
else:
|
else:
|
||||||
return self._handle_request()
|
return self._handle_request()
|
||||||
|
|
||||||
def _request_str(self, conn, url):
|
|
||||||
'''Return string representation of connection.'''
|
|
||||||
return "%s %s/%s" % (self._method, _conn_str(conn), url)
|
|
||||||
|
|
||||||
def _issue_request(self):
|
|
||||||
'''Issue a request to a provider.'''
|
|
||||||
conn = self._api_client.acquire_connection(rid=self._rid())
|
|
||||||
if conn is None:
|
|
||||||
error = Exception(_("No API connections available"))
|
|
||||||
self._request_error = error
|
|
||||||
return error
|
|
||||||
|
|
||||||
# Preserve the acquired connection as conn may be over-written by
|
|
||||||
# redirects below.
|
|
||||||
acquired_conn = conn
|
|
||||||
|
|
||||||
url = self._url
|
|
||||||
lg.debug(_("[%(rid)d] Issuing - request '%(req)s'"),
|
|
||||||
{'rid': self._rid(),
|
|
||||||
'req': self._request_str(conn, url)})
|
|
||||||
issued_time = time.time()
|
|
||||||
is_conn_error = False
|
|
||||||
try:
|
|
||||||
redirects = 0
|
|
||||||
while (redirects <= self._redirects):
|
|
||||||
# Update connection with user specified request timeout,
|
|
||||||
# the connect timeout is usually smaller so we only set
|
|
||||||
# the request timeout after a connection is established
|
|
||||||
if conn.sock is None:
|
|
||||||
conn.connect()
|
|
||||||
conn.sock.settimeout(self._http_timeout)
|
|
||||||
elif conn.sock.gettimeout() != self._http_timeout:
|
|
||||||
conn.sock.settimeout(self._http_timeout)
|
|
||||||
|
|
||||||
headers = copy.copy(self._headers)
|
|
||||||
gen = self._api_client.nvp_config_gen
|
|
||||||
if gen:
|
|
||||||
headers["X-Nvp-Wait-For-Config-Generation"] = gen
|
|
||||||
lg.debug(_("Setting %(header)s request header: %(gen)s"),
|
|
||||||
{'header': 'X-Nvp-Wait-For-Config-Generation',
|
|
||||||
'gen': gen})
|
|
||||||
try:
|
|
||||||
conn.request(self._method, url, self._body, headers)
|
|
||||||
except Exception as e:
|
|
||||||
lg.warn(_('[%(rid)d] Exception issuing request: %(e)s'),
|
|
||||||
{'rid': self._rid(), 'e': e})
|
|
||||||
raise e
|
|
||||||
|
|
||||||
response = conn.getresponse()
|
|
||||||
response.body = response.read()
|
|
||||||
response.headers = response.getheaders()
|
|
||||||
lg.debug(_("[%(rid)d] Completed request '%(req)s': %(status)s "
|
|
||||||
"(%(time)0.2f seconds)"),
|
|
||||||
{'rid': self._rid(),
|
|
||||||
'req': self._request_str(conn, url),
|
|
||||||
'status': response.status,
|
|
||||||
'time': time.time() - issued_time})
|
|
||||||
|
|
||||||
new_gen = response.getheader('X-Nvp-Config-Generation', None)
|
|
||||||
if new_gen:
|
|
||||||
lg.debug(_("Reading %(header)s response header: %(gen)s"),
|
|
||||||
{'header': 'X-Nvp-config-Generation',
|
|
||||||
'gen': new_gen})
|
|
||||||
if (self._api_client.nvp_config_gen is None or
|
|
||||||
self._api_client.nvp_config_gen < int(new_gen)):
|
|
||||||
self._api_client.nvp_config_gen = int(new_gen)
|
|
||||||
|
|
||||||
if response.status not in [httplib.MOVED_PERMANENTLY,
|
|
||||||
httplib.TEMPORARY_REDIRECT]:
|
|
||||||
break
|
|
||||||
elif redirects >= self._redirects:
|
|
||||||
lg.info(_("[%d] Maximum redirects exceeded, aborting "
|
|
||||||
"request"), self._rid())
|
|
||||||
break
|
|
||||||
redirects += 1
|
|
||||||
|
|
||||||
# In the following call, conn is replaced by the connection
|
|
||||||
# specified in the redirect response from the server.
|
|
||||||
conn, url = self._redirect_params(conn, response.headers)
|
|
||||||
if url is None:
|
|
||||||
response.status = httplib.INTERNAL_SERVER_ERROR
|
|
||||||
break
|
|
||||||
lg.info(_("[%(rid)d] Redirecting request to: %(req)s"),
|
|
||||||
{'rid': self._rid(),
|
|
||||||
'req': self._request_str(conn, url)})
|
|
||||||
|
|
||||||
# FIX for #9415. If we receive any of these responses, then
|
|
||||||
# our server did not process our request and may be in an
|
|
||||||
# errored state. Raise an exception, which will cause the
|
|
||||||
# the conn to be released with is_conn_error == True
|
|
||||||
# which puts the conn on the back of the client's priority
|
|
||||||
# queue.
|
|
||||||
if response.status >= 500:
|
|
||||||
lg.warn(_("[%(rid)d] Request '%(method)s %(url)s' "
|
|
||||||
"received: %(status)s"),
|
|
||||||
{'rid': self._rid(), 'method': self._method,
|
|
||||||
'url': self._url,
|
|
||||||
'status': response.status})
|
|
||||||
raise Exception(_('Server error return: %s') %
|
|
||||||
response.status)
|
|
||||||
return response
|
|
||||||
except Exception as e:
|
|
||||||
if isinstance(e, httplib.BadStatusLine):
|
|
||||||
msg = _("Invalid server response")
|
|
||||||
else:
|
|
||||||
msg = unicode(e)
|
|
||||||
lg.warn(_("[%(rid)d] Failed request '%(req)s': %(msg)s "
|
|
||||||
"(%(time)0.2f seconds)"),
|
|
||||||
{'rid': self._rid(), 'req': self._request_str(conn, url),
|
|
||||||
'msg': msg,
|
|
||||||
'time': time.time() - issued_time})
|
|
||||||
self._request_error = e
|
|
||||||
is_conn_error = True
|
|
||||||
return e
|
|
||||||
finally:
|
|
||||||
# Make sure we release the original connection provided by the
|
|
||||||
# acquire_connection() call above.
|
|
||||||
self._api_client.release_connection(acquired_conn, is_conn_error,
|
|
||||||
rid=self._rid())
|
|
||||||
|
|
||||||
def _redirect_params(self, conn, headers):
|
|
||||||
'''Process redirect params from a server response.'''
|
|
||||||
url = None
|
|
||||||
for name, value in headers:
|
|
||||||
if name.lower() == "location":
|
|
||||||
url = value
|
|
||||||
break
|
|
||||||
if not url:
|
|
||||||
lg.warn(_("[%d] Received redirect status without location header "
|
|
||||||
"field"), self._rid())
|
|
||||||
return (conn, None)
|
|
||||||
# Accept location with the following format:
|
|
||||||
# 1. /path, redirect to same node
|
|
||||||
# 2. scheme://hostname:[port]/path where scheme is https or http
|
|
||||||
# Reject others
|
|
||||||
# 3. e.g. relative paths, unsupported scheme, unspecified host
|
|
||||||
result = urlparse.urlparse(url)
|
|
||||||
if not result.scheme and not result.hostname and result.path:
|
|
||||||
if result.path[0] == "/":
|
|
||||||
if result.query:
|
|
||||||
url = "%s?%s" % (result.path, result.query)
|
|
||||||
else:
|
|
||||||
url = result.path
|
|
||||||
return (conn, url) # case 1
|
|
||||||
else:
|
|
||||||
lg.warn(_("[%(rid)d] Received invalid redirect location: "
|
|
||||||
"%(url)s"),
|
|
||||||
{'rid': self._rid(), 'url': url})
|
|
||||||
return (conn, None) # case 3
|
|
||||||
elif result.scheme not in ["http", "https"] or not result.hostname:
|
|
||||||
lg.warn(_("[%(rid)d] Received malformed redirect location: "
|
|
||||||
"%(url)s"),
|
|
||||||
{'rid': self._rid(), 'url': url})
|
|
||||||
return (conn, None) # case 3
|
|
||||||
# case 2, redirect location includes a scheme
|
|
||||||
# so setup a new connection and authenticate
|
|
||||||
use_https = result.scheme == "https"
|
|
||||||
api_providers = [(result.hostname, result.port, use_https)]
|
|
||||||
api_client = client_eventlet.NvpApiClientEventlet(
|
|
||||||
api_providers, self._api_client.user, self._api_client.password,
|
|
||||||
use_https=use_https)
|
|
||||||
api_client.wait_for_login()
|
|
||||||
if api_client.auth_cookie:
|
|
||||||
self._headers["Cookie"] = api_client.auth_cookie
|
|
||||||
else:
|
|
||||||
self._headers["Cookie"] = ""
|
|
||||||
conn = api_client.acquire_connection(rid=self._rid())
|
|
||||||
if result.query:
|
|
||||||
url = "%s?%s" % (result.path, result.query)
|
|
||||||
else:
|
|
||||||
url = result.path
|
|
||||||
return (conn, url)
|
|
||||||
|
|
||||||
def _handle_request(self):
|
def _handle_request(self):
|
||||||
'''First level request handling.'''
|
'''First level request handling.'''
|
||||||
attempt = 0
|
attempt = 0
|
||||||
@ -350,46 +141,41 @@ class NvpApiRequestEventlet:
|
|||||||
while response is None and attempt <= self._retries:
|
while response is None and attempt <= self._retries:
|
||||||
attempt += 1
|
attempt += 1
|
||||||
|
|
||||||
if self._auto_login and self._api_client.need_login:
|
|
||||||
self._api_client.wait_for_login()
|
|
||||||
|
|
||||||
if self._api_client.auth_cookie:
|
|
||||||
self._headers["Cookie"] = self._api_client.auth_cookie
|
|
||||||
|
|
||||||
req = self.spawn(self._issue_request).wait()
|
req = self.spawn(self._issue_request).wait()
|
||||||
# automatically raises any exceptions returned.
|
# automatically raises any exceptions returned.
|
||||||
if isinstance(req, httplib.HTTPResponse):
|
if isinstance(req, httplib.HTTPResponse):
|
||||||
if (req.status == httplib.UNAUTHORIZED
|
if attempt <= self._retries and not self._abort:
|
||||||
|
if (req.status == httplib.UNAUTHORIZED
|
||||||
or req.status == httplib.FORBIDDEN):
|
or req.status == httplib.FORBIDDEN):
|
||||||
self._api_client.need_login = True
|
|
||||||
if attempt <= self._retries:
|
|
||||||
continue
|
continue
|
||||||
# else fall through to return the error code
|
# else fall through to return the error code
|
||||||
|
|
||||||
lg.debug(_("[%(rid)d] Completed request '%(method)s %(url)s'"
|
LOG.debug(_("[%(rid)d] Completed request '%(method)s %(url)s'"
|
||||||
": %(status)s"),
|
": %(status)s"),
|
||||||
{'rid': self._rid(), 'method': self._method,
|
{'rid': self._rid(), 'method': self._method,
|
||||||
'url': self._url, 'status': req.status})
|
'url': self._url, 'status': req.status})
|
||||||
self._request_error = None
|
self._request_error = None
|
||||||
response = req
|
response = req
|
||||||
else:
|
else:
|
||||||
lg.info(_('[%(rid)d] Error while handling request: %(req)s'),
|
LOG.info(_('[%(rid)d] Error while handling request: %(req)s'),
|
||||||
{'rid': self._rid(), 'req': req})
|
{'rid': self._rid(), 'req': req})
|
||||||
self._request_error = req
|
self._request_error = req
|
||||||
response = None
|
response = None
|
||||||
|
|
||||||
return response
|
return response
|
||||||
|
|
||||||
|
|
||||||
class NvpLoginRequestEventlet(NvpApiRequestEventlet):
|
class NvpLoginRequestEventlet(NvpApiRequestEventlet):
|
||||||
'''Process a login request.'''
|
'''Process a login request.'''
|
||||||
|
|
||||||
def __init__(self, nvp_client, user, password):
|
def __init__(self, nvp_client, user, password, client_conn=None,
|
||||||
headers = {"Content-Type": "application/x-www-form-urlencoded"}
|
headers=None):
|
||||||
|
if headers is None:
|
||||||
|
headers = {}
|
||||||
|
headers.update({"Content-Type": "application/x-www-form-urlencoded"})
|
||||||
body = urllib.urlencode({"username": user, "password": password})
|
body = urllib.urlencode({"username": user, "password": password})
|
||||||
NvpApiRequestEventlet.__init__(
|
NvpApiRequestEventlet.__init__(
|
||||||
self, nvp_client, "/ws.v1/login", "POST", body, headers,
|
self, nvp_client, "/ws.v1/login", "POST", body, headers,
|
||||||
auto_login=False)
|
auto_login=False, client_conn=client_conn)
|
||||||
|
|
||||||
def session_cookie(self):
|
def session_cookie(self):
|
||||||
if self.successful():
|
if self.successful():
|
||||||
@ -398,7 +184,7 @@ class NvpLoginRequestEventlet(NvpApiRequestEventlet):
|
|||||||
|
|
||||||
|
|
||||||
class NvpGetApiProvidersRequestEventlet(NvpApiRequestEventlet):
|
class NvpGetApiProvidersRequestEventlet(NvpApiRequestEventlet):
|
||||||
'''Get a list of API providers.'''
|
'''Gej a list of API providers.'''
|
||||||
|
|
||||||
def __init__(self, nvp_client):
|
def __init__(self, nvp_client):
|
||||||
url = "/ws.v1/control-cluster/node?fields=roles"
|
url = "/ws.v1/control-cluster/node?fields=roles"
|
||||||
@ -427,8 +213,8 @@ class NvpGetApiProvidersRequestEventlet(NvpApiRequestEventlet):
|
|||||||
ret.append(_provider_from_listen_addr(addr))
|
ret.append(_provider_from_listen_addr(addr))
|
||||||
return ret
|
return ret
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
lg.warn(_("[%(rid)d] Failed to parse API provider: %(e)s"),
|
LOG.warn(_("[%(rid)d] Failed to parse API provider: %(e)s"),
|
||||||
{'rid': self._rid(), 'e': e})
|
{'rid': self._rid(), 'e': e})
|
||||||
# intentionally fall through
|
# intentionally fall through
|
||||||
return None
|
return None
|
||||||
|
|
||||||
@ -438,12 +224,11 @@ class NvpGenericRequestEventlet(NvpApiRequestEventlet):
|
|||||||
|
|
||||||
def __init__(self, nvp_client, method, url, body, content_type,
|
def __init__(self, nvp_client, method, url, body, content_type,
|
||||||
auto_login=False,
|
auto_login=False,
|
||||||
request_timeout=DEFAULT_REQUEST_TIMEOUT,
|
request_timeout=request.DEFAULT_REQUEST_TIMEOUT,
|
||||||
http_timeout=DEFAULT_HTTP_TIMEOUT,
|
http_timeout=request.DEFAULT_HTTP_TIMEOUT,
|
||||||
retries=DEFAULT_RETRIES,
|
retries=request.DEFAULT_RETRIES,
|
||||||
redirects=DEFAULT_REDIRECTS):
|
redirects=request.DEFAULT_REDIRECTS):
|
||||||
headers = {"Content-Type": content_type}
|
headers = {"Content-Type": content_type}
|
||||||
|
|
||||||
NvpApiRequestEventlet.__init__(
|
NvpApiRequestEventlet.__init__(
|
||||||
self, nvp_client, url, method, body, headers,
|
self, nvp_client, url, method, body, headers,
|
||||||
request_timeout=request_timeout, retries=retries,
|
request_timeout=request_timeout, retries=retries,
|
||||||
|
@ -21,7 +21,7 @@ nvp_opts = [
|
|||||||
cfg.IntOpt('max_lp_per_bridged_ls', default=64),
|
cfg.IntOpt('max_lp_per_bridged_ls', default=64),
|
||||||
cfg.IntOpt('max_lp_per_overlay_ls', default=256),
|
cfg.IntOpt('max_lp_per_overlay_ls', default=256),
|
||||||
cfg.IntOpt('concurrent_connections', default=5),
|
cfg.IntOpt('concurrent_connections', default=5),
|
||||||
cfg.IntOpt('failover_time', default=240),
|
cfg.IntOpt('nvp_gen_timeout', default=-1),
|
||||||
cfg.StrOpt('default_cluster_name')
|
cfg.StrOpt('default_cluster_name')
|
||||||
]
|
]
|
||||||
|
|
||||||
|
@ -134,7 +134,6 @@ class NvpApiRequestEventletTest(unittest.TestCase):
|
|||||||
myconn.__str__.return_value = 'myconn string'
|
myconn.__str__.return_value = 'myconn string'
|
||||||
|
|
||||||
req = self.req
|
req = self.req
|
||||||
req._request_timeout = REQUEST_TIMEOUT = 1
|
|
||||||
req._redirect_params = Mock()
|
req._redirect_params = Mock()
|
||||||
req._redirect_params.return_value = (myconn, 'url')
|
req._redirect_params.return_value = (myconn, 'url')
|
||||||
req._request_str = Mock()
|
req._request_str = Mock()
|
||||||
@ -214,60 +213,48 @@ class NvpApiRequestEventletTest(unittest.TestCase):
|
|||||||
with patch('quantum.plugins.nicira.nicira_nvp_plugin.api_client.'
|
with patch('quantum.plugins.nicira.nicira_nvp_plugin.api_client.'
|
||||||
'client_eventlet.NvpApiClientEventlet') as mock:
|
'client_eventlet.NvpApiClientEventlet') as mock:
|
||||||
api_client = mock.return_value
|
api_client = mock.return_value
|
||||||
api_client.wait_for_login.return_value = None
|
self.req._api_client = api_client
|
||||||
api_client.auth_cookie = 'mycookie'
|
|
||||||
api_client.acquire_connection.return_value = True
|
|
||||||
myconn = Mock()
|
myconn = Mock()
|
||||||
(conn, retval) = self.req._redirect_params(
|
(conn, retval) = self.req._redirect_params(
|
||||||
myconn, [('location', 'https://host:1/path')])
|
myconn, [('location', 'https://host:1/path')])
|
||||||
|
|
||||||
self.assertTrue(retval is not None)
|
self.assertTrue(retval is not None)
|
||||||
self.assertTrue(api_client.wait_for_login.called)
|
self.assertTrue(api_client.acquire_redirect_connection.called)
|
||||||
self.assertTrue(api_client.acquire_connection.called)
|
|
||||||
|
|
||||||
def test_redirect_params_setup_htttps_and_query(self):
|
def test_redirect_params_setup_htttps_and_query(self):
|
||||||
with patch('quantum.plugins.nicira.nicira_nvp_plugin.api_client.'
|
with patch('quantum.plugins.nicira.nicira_nvp_plugin.api_client.'
|
||||||
'client_eventlet.NvpApiClientEventlet') as mock:
|
'client_eventlet.NvpApiClientEventlet') as mock:
|
||||||
api_client = mock.return_value
|
api_client = mock.return_value
|
||||||
api_client.wait_for_login.return_value = None
|
self.req._api_client = api_client
|
||||||
api_client.auth_cookie = 'mycookie'
|
|
||||||
api_client.acquire_connection.return_value = True
|
|
||||||
myconn = Mock()
|
myconn = Mock()
|
||||||
(conn, retval) = self.req._redirect_params(myconn, [
|
(conn, retval) = self.req._redirect_params(myconn, [
|
||||||
('location', 'https://host:1/path?q=1')])
|
('location', 'https://host:1/path?q=1')])
|
||||||
|
|
||||||
self.assertTrue(retval is not None)
|
self.assertTrue(retval is not None)
|
||||||
self.assertTrue(api_client.wait_for_login.called)
|
self.assertTrue(api_client.acquire_redirect_connection.called)
|
||||||
self.assertTrue(api_client.acquire_connection.called)
|
|
||||||
|
|
||||||
def test_redirect_params_setup_https_connection_no_cookie(self):
|
def test_redirect_params_setup_https_connection_no_cookie(self):
|
||||||
with patch('quantum.plugins.nicira.nicira_nvp_plugin.api_client.'
|
with patch('quantum.plugins.nicira.nicira_nvp_plugin.api_client.'
|
||||||
'client_eventlet.NvpApiClientEventlet') as mock:
|
'client_eventlet.NvpApiClientEventlet') as mock:
|
||||||
api_client = mock.return_value
|
api_client = mock.return_value
|
||||||
api_client.wait_for_login.return_value = None
|
self.req._api_client = api_client
|
||||||
api_client.auth_cookie = None
|
|
||||||
api_client.acquire_connection.return_value = True
|
|
||||||
myconn = Mock()
|
myconn = Mock()
|
||||||
(conn, retval) = self.req._redirect_params(myconn, [
|
(conn, retval) = self.req._redirect_params(myconn, [
|
||||||
('location', 'https://host:1/path')])
|
('location', 'https://host:1/path')])
|
||||||
|
|
||||||
self.assertTrue(retval is not None)
|
self.assertTrue(retval is not None)
|
||||||
self.assertTrue(api_client.wait_for_login.called)
|
self.assertTrue(api_client.acquire_redirect_connection.called)
|
||||||
self.assertTrue(api_client.acquire_connection.called)
|
|
||||||
|
|
||||||
def test_redirect_params_setup_https_and_query_no_cookie(self):
|
def test_redirect_params_setup_https_and_query_no_cookie(self):
|
||||||
with patch('quantum.plugins.nicira.nicira_nvp_plugin.api_client.'
|
with patch('quantum.plugins.nicira.nicira_nvp_plugin.api_client.'
|
||||||
'client_eventlet.NvpApiClientEventlet') as mock:
|
'client_eventlet.NvpApiClientEventlet') as mock:
|
||||||
api_client = mock.return_value
|
api_client = mock.return_value
|
||||||
api_client.wait_for_login.return_value = None
|
self.req._api_client = api_client
|
||||||
api_client.auth_cookie = None
|
|
||||||
api_client.acquire_connection.return_value = True
|
|
||||||
myconn = Mock()
|
myconn = Mock()
|
||||||
(conn, retval) = self.req._redirect_params(
|
(conn, retval) = self.req._redirect_params(
|
||||||
myconn, [('location', 'https://host:1/path?q=1')])
|
myconn, [('location', 'https://host:1/path?q=1')])
|
||||||
self.assertTrue(retval is not None)
|
self.assertTrue(retval is not None)
|
||||||
self.assertTrue(api_client.wait_for_login.called)
|
self.assertTrue(api_client.acquire_redirect_connection.called)
|
||||||
self.assertTrue(api_client.acquire_connection.called)
|
|
||||||
|
|
||||||
def test_redirect_params_path_only_with_query(self):
|
def test_redirect_params_path_only_with_query(self):
|
||||||
with patch('quantum.plugins.nicira.nicira_nvp_plugin.api_client.'
|
with patch('quantum.plugins.nicira.nicira_nvp_plugin.api_client.'
|
||||||
|
Loading…
x
Reference in New Issue
Block a user