diff --git a/oslo/messaging/_drivers/amqp.py b/oslo/messaging/_drivers/amqp.py index 6065267f4..b946e88e4 100644 --- a/oslo/messaging/_drivers/amqp.py +++ b/oslo/messaging/_drivers/amqp.py @@ -28,15 +28,15 @@ AMQP, but is deprecated and predates this code. import collections import inspect import sys +import threading import uuid from eventlet import greenpool -from eventlet import pools from eventlet import queue -from eventlet import semaphore from oslo.config import cfg from oslo.messaging._drivers import common as rpc_common +from oslo.messaging._drivers import pool from oslo.messaging.openstack.common import excutils from oslo.messaging.openstack.common.gettextutils import _ # noqa from oslo.messaging.openstack.common import local @@ -58,14 +58,12 @@ UNIQUE_ID = '_unique_id' LOG = logging.getLogger(__name__) -class Pool(pools.Pool): +class ConnectionPool(pool.Pool): """Class that implements a Pool of Connections.""" - def __init__(self, conf, connection_cls, *args, **kwargs): + def __init__(self, conf, connection_cls): self.connection_cls = connection_cls self.conf = conf - kwargs.setdefault("max_size", self.conf.rpc_conn_pool_size) - kwargs.setdefault("order_as_stack", True) - super(Pool, self).__init__(*args, **kwargs) + super(ConnectionPool, self).__init__(self.conf.rpc_conn_pool_size) self.reply_proxy = None # TODO(comstud): Timeout connections not used in a while @@ -74,8 +72,8 @@ class Pool(pools.Pool): return self.connection_cls(self.conf) def empty(self): - while self.free_items: - self.get().close() + for item in self.iter_free: + item.close() # Force a new connection pool to be created. # Note that this was added due to failing unit test cases. The issue # is the above "while loop" gets all the cached connections from the @@ -88,14 +86,14 @@ class Pool(pools.Pool): self.connection_cls.pool = None -_pool_create_sem = semaphore.Semaphore() +_pool_create_sem = threading.Lock() def get_connection_pool(conf, connection_cls): with _pool_create_sem: # Make sure only one thread tries to create the connection pool. if not connection_cls.pool: - connection_cls.pool = Pool(conf, connection_cls) + connection_cls.pool = ConnectionPool(conf, connection_cls) return connection_cls.pool @@ -517,7 +515,7 @@ def create_connection(conf, new, connection_pool): return ConnectionContext(conf, connection_pool, pooled=not new) -_reply_proxy_create_sem = semaphore.Semaphore() +_reply_proxy_create_sem = threading.Lock() def multicall(conf, context, topic, msg, timeout, connection_pool): diff --git a/oslo/messaging/_drivers/pool.py b/oslo/messaging/_drivers/pool.py new file mode 100644 index 000000000..b7877d005 --- /dev/null +++ b/oslo/messaging/_drivers/pool.py @@ -0,0 +1,91 @@ + +# Copyright 2013 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import abc +import collections +import threading + + +class Pool(object): + + """A thread-safe object pool. + + Modelled after the eventlet.pools.Pool interface, but designed to be safe + when using native threads without the GIL. + + Resizing is not supported. + """ + + __metaclass__ = abc.ABCMeta + + def __init__(self, max_size=4): + super(Pool, self).__init__() + + self._max_size = max_size + self._current_size = 0 + self._cond = threading.Condition() + + self._items = collections.deque() + + def put(self, item): + """Return an item to the pool.""" + with self._cond: + self._items.appendleft(item) + self._cond.notify() + + def get(self, only_free=False): + """Return an item from the pool, when one is available. + + This may cause the calling thread to block. + + :param only_free: if True, return None if no free item available + :type only_free: bool + """ + with self._cond: + while True: + try: + return self._items.popleft() + except IndexError: + if only_free: + return None + + if self._current_size < self._max_size: + self._current_size += 1 + break + + # FIXME(markmc): timeout needed to allow keyboard interrupt + # http://bugs.python.org/issue8844 + self._cond.wait(timeout=1) + + # We've grabbed a slot and dropped the lock, now do the creation + try: + return self.create() + except Exception: + with self._cond: + self._current_size -= 1 + raise + + def iter_free(self): + """Iterate over free items.""" + with self._cond: + while True: + try: + yield self._items.popleft() + except IndexError: + break + + @abc.abstractmethod + def create(self): + """Construct a new item."""