Port away from some eventlet infrastructure
Add a simple object pool implementation for our connection pool, in place of eventlet.pools.Pool. Also use threading.Lock in place of eventlet.Semaphore. There are still some eventlet modules imported by the code, but we can avoid using them at runtime and clean things up later. We can't remove them now or it'll cause pep8 failures. Change-Id: I380408d1321802de813de541cd0a2d4305c3627c
This commit is contained in:
parent
37bd6923dc
commit
e3c5b99959
@ -28,15 +28,15 @@ AMQP, but is deprecated and predates this code.
|
||||
import collections
|
||||
import inspect
|
||||
import sys
|
||||
import threading
|
||||
import uuid
|
||||
|
||||
from eventlet import greenpool
|
||||
from eventlet import pools
|
||||
from eventlet import queue
|
||||
from eventlet import semaphore
|
||||
from oslo.config import cfg
|
||||
|
||||
from oslo.messaging._drivers import common as rpc_common
|
||||
from oslo.messaging._drivers import pool
|
||||
from oslo.messaging.openstack.common import excutils
|
||||
from oslo.messaging.openstack.common.gettextutils import _ # noqa
|
||||
from oslo.messaging.openstack.common import local
|
||||
@ -58,14 +58,12 @@ UNIQUE_ID = '_unique_id'
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Pool(pools.Pool):
|
||||
class ConnectionPool(pool.Pool):
|
||||
"""Class that implements a Pool of Connections."""
|
||||
def __init__(self, conf, connection_cls, *args, **kwargs):
|
||||
def __init__(self, conf, connection_cls):
|
||||
self.connection_cls = connection_cls
|
||||
self.conf = conf
|
||||
kwargs.setdefault("max_size", self.conf.rpc_conn_pool_size)
|
||||
kwargs.setdefault("order_as_stack", True)
|
||||
super(Pool, self).__init__(*args, **kwargs)
|
||||
super(ConnectionPool, self).__init__(self.conf.rpc_conn_pool_size)
|
||||
self.reply_proxy = None
|
||||
|
||||
# TODO(comstud): Timeout connections not used in a while
|
||||
@ -74,8 +72,8 @@ class Pool(pools.Pool):
|
||||
return self.connection_cls(self.conf)
|
||||
|
||||
def empty(self):
|
||||
while self.free_items:
|
||||
self.get().close()
|
||||
for item in self.iter_free:
|
||||
item.close()
|
||||
# Force a new connection pool to be created.
|
||||
# Note that this was added due to failing unit test cases. The issue
|
||||
# is the above "while loop" gets all the cached connections from the
|
||||
@ -88,14 +86,14 @@ class Pool(pools.Pool):
|
||||
self.connection_cls.pool = None
|
||||
|
||||
|
||||
_pool_create_sem = semaphore.Semaphore()
|
||||
_pool_create_sem = threading.Lock()
|
||||
|
||||
|
||||
def get_connection_pool(conf, connection_cls):
|
||||
with _pool_create_sem:
|
||||
# Make sure only one thread tries to create the connection pool.
|
||||
if not connection_cls.pool:
|
||||
connection_cls.pool = Pool(conf, connection_cls)
|
||||
connection_cls.pool = ConnectionPool(conf, connection_cls)
|
||||
return connection_cls.pool
|
||||
|
||||
|
||||
@ -517,7 +515,7 @@ def create_connection(conf, new, connection_pool):
|
||||
return ConnectionContext(conf, connection_pool, pooled=not new)
|
||||
|
||||
|
||||
_reply_proxy_create_sem = semaphore.Semaphore()
|
||||
_reply_proxy_create_sem = threading.Lock()
|
||||
|
||||
|
||||
def multicall(conf, context, topic, msg, timeout, connection_pool):
|
||||
|
91
oslo/messaging/_drivers/pool.py
Normal file
91
oslo/messaging/_drivers/pool.py
Normal file
@ -0,0 +1,91 @@
|
||||
|
||||
# Copyright 2013 Red Hat, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import abc
|
||||
import collections
|
||||
import threading
|
||||
|
||||
|
||||
class Pool(object):
|
||||
|
||||
"""A thread-safe object pool.
|
||||
|
||||
Modelled after the eventlet.pools.Pool interface, but designed to be safe
|
||||
when using native threads without the GIL.
|
||||
|
||||
Resizing is not supported.
|
||||
"""
|
||||
|
||||
__metaclass__ = abc.ABCMeta
|
||||
|
||||
def __init__(self, max_size=4):
|
||||
super(Pool, self).__init__()
|
||||
|
||||
self._max_size = max_size
|
||||
self._current_size = 0
|
||||
self._cond = threading.Condition()
|
||||
|
||||
self._items = collections.deque()
|
||||
|
||||
def put(self, item):
|
||||
"""Return an item to the pool."""
|
||||
with self._cond:
|
||||
self._items.appendleft(item)
|
||||
self._cond.notify()
|
||||
|
||||
def get(self, only_free=False):
|
||||
"""Return an item from the pool, when one is available.
|
||||
|
||||
This may cause the calling thread to block.
|
||||
|
||||
:param only_free: if True, return None if no free item available
|
||||
:type only_free: bool
|
||||
"""
|
||||
with self._cond:
|
||||
while True:
|
||||
try:
|
||||
return self._items.popleft()
|
||||
except IndexError:
|
||||
if only_free:
|
||||
return None
|
||||
|
||||
if self._current_size < self._max_size:
|
||||
self._current_size += 1
|
||||
break
|
||||
|
||||
# FIXME(markmc): timeout needed to allow keyboard interrupt
|
||||
# http://bugs.python.org/issue8844
|
||||
self._cond.wait(timeout=1)
|
||||
|
||||
# We've grabbed a slot and dropped the lock, now do the creation
|
||||
try:
|
||||
return self.create()
|
||||
except Exception:
|
||||
with self._cond:
|
||||
self._current_size -= 1
|
||||
raise
|
||||
|
||||
def iter_free(self):
|
||||
"""Iterate over free items."""
|
||||
with self._cond:
|
||||
while True:
|
||||
try:
|
||||
yield self._items.popleft()
|
||||
except IndexError:
|
||||
break
|
||||
|
||||
@abc.abstractmethod
|
||||
def create(self):
|
||||
"""Construct a new item."""
|
Loading…
x
Reference in New Issue
Block a user