026a656893
This patch fixes the pep8 job which was failing with: N341 _ from python builtins module is used. Use _ from vmware_nsx._i18n instead Change-Id: I5be1646d6505dd3b2383abb28234f3ab612549a6
157 lines
6.5 KiB
Python
157 lines
6.5 KiB
Python
# Copyright 2012 VMware, Inc.
|
|
#
|
|
# All Rights Reserved
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
#
|
|
|
|
import time
|
|
|
|
import eventlet
|
|
eventlet.monkey_patch()
|
|
from oslo_log import log as logging
|
|
|
|
from vmware_nsx._i18n import _LE
|
|
from vmware_nsx.api_client import base
|
|
from vmware_nsx.api_client import eventlet_request
|
|
|
|
LOG = logging.getLogger(__name__)
|
|
|
|
|
|
class EventletApiClient(base.ApiClientBase):
|
|
"""Eventlet-based implementation of NSX ApiClient ABC."""
|
|
|
|
def __init__(self, api_providers, user, password,
|
|
concurrent_connections=base.DEFAULT_CONCURRENT_CONNECTIONS,
|
|
gen_timeout=base.GENERATION_ID_TIMEOUT,
|
|
use_https=True,
|
|
connect_timeout=base.DEFAULT_CONNECT_TIMEOUT):
|
|
'''Constructor
|
|
|
|
:param api_providers: a list of tuples of the form: (host, port,
|
|
is_ssl).
|
|
:param user: login username.
|
|
:param password: login password.
|
|
:param concurrent_connections: total number of concurrent connections.
|
|
:param use_https: whether or not to use https for requests.
|
|
:param connect_timeout: connection timeout in seconds.
|
|
:param gen_timeout controls how long the generation id is kept
|
|
if set to -1 the generation id is never timed out
|
|
'''
|
|
if not api_providers:
|
|
api_providers = []
|
|
self._api_providers = set([tuple(p) for p in api_providers])
|
|
self._api_provider_data = {} # tuple(semaphore, session_cookie)
|
|
for p in self._api_providers:
|
|
self._set_provider_data(p, (eventlet.semaphore.Semaphore(1), None))
|
|
self._user = user
|
|
self._password = password
|
|
self._concurrent_connections = concurrent_connections
|
|
self._use_https = use_https
|
|
self._connect_timeout = connect_timeout
|
|
self._config_gen = None
|
|
self._config_gen_ts = None
|
|
self._gen_timeout = gen_timeout
|
|
|
|
# Connection pool is a list of queues.
|
|
self._conn_pool = eventlet.queue.PriorityQueue()
|
|
self._next_conn_priority = 1
|
|
for __ in range(concurrent_connections):
|
|
for host, port, is_ssl in api_providers:
|
|
conn = self._create_connection(host, port, is_ssl)
|
|
self._conn_pool.put((self._next_conn_priority, conn))
|
|
self._next_conn_priority += 1
|
|
|
|
def acquire_redirect_connection(self, conn_params, auto_login=True,
|
|
headers=None):
|
|
"""Check out or create connection to redirected NSX API server.
|
|
|
|
Args:
|
|
conn_params: tuple specifying target of redirect, see
|
|
self._conn_params()
|
|
auto_login: returned connection should have valid session cookie
|
|
headers: headers to pass on if auto_login
|
|
|
|
Returns: An available HTTPConnection instance corresponding to the
|
|
specified conn_params. If a connection did not previously
|
|
exist, new connections are created with the highest prioity
|
|
in the connection pool and one of these new connections
|
|
returned.
|
|
"""
|
|
result_conn = None
|
|
data = self._get_provider_data(conn_params)
|
|
if data:
|
|
# redirect target already exists in provider data and connections
|
|
# to the provider have been added to the connection pool. Try to
|
|
# obtain a connection from the pool, note that it's possible that
|
|
# all connection to the provider are currently in use.
|
|
conns = []
|
|
while not self._conn_pool.empty():
|
|
priority, conn = self._conn_pool.get_nowait()
|
|
if not result_conn and self._conn_params(conn) == conn_params:
|
|
conn.priority = priority
|
|
result_conn = conn
|
|
else:
|
|
conns.append((priority, conn))
|
|
for priority, conn in conns:
|
|
self._conn_pool.put((priority, conn))
|
|
# hack: if no free connections available, create new connection
|
|
# and stash "no_release" attribute (so that we only exceed
|
|
# self._concurrent_connections temporarily)
|
|
if not result_conn:
|
|
conn = self._create_connection(*conn_params)
|
|
conn.priority = 0 # redirect connections have highest priority
|
|
conn.no_release = True
|
|
result_conn = conn
|
|
else:
|
|
#redirect target not already known, setup provider lists
|
|
self._api_providers.update([conn_params])
|
|
self._set_provider_data(conn_params,
|
|
(eventlet.semaphore.Semaphore(1), None))
|
|
# redirects occur during cluster upgrades, i.e. results to old
|
|
# redirects to new, so give redirect targets highest priority
|
|
priority = 0
|
|
for i in range(self._concurrent_connections):
|
|
conn = self._create_connection(*conn_params)
|
|
conn.priority = priority
|
|
if i == self._concurrent_connections - 1:
|
|
break
|
|
self._conn_pool.put((priority, conn))
|
|
result_conn = conn
|
|
if result_conn:
|
|
result_conn.last_used = time.time()
|
|
if auto_login and self.auth_cookie(conn) is None:
|
|
self._wait_for_login(result_conn, headers)
|
|
return result_conn
|
|
|
|
def _login(self, conn=None, headers=None):
|
|
'''Issue login request and update authentication cookie.'''
|
|
cookie = None
|
|
g = eventlet_request.LoginRequestEventlet(
|
|
self, self._user, self._password, conn, headers)
|
|
g.start()
|
|
ret = g.join()
|
|
if ret:
|
|
if isinstance(ret, Exception):
|
|
LOG.error(_LE('Login error "%s"'), ret)
|
|
raise ret
|
|
|
|
cookie = ret.getheader("Set-Cookie")
|
|
if cookie:
|
|
LOG.debug("Saving new authentication cookie '%s'", cookie)
|
|
|
|
return cookie
|
|
|
|
# Register as subclass.
|
|
base.ApiClientBase.register(EventletApiClient)
|