From cf08efcd3311853cf3ba276952e3d1fca4ce9f2a Mon Sep 17 00:00:00 2001 From: Serg Melikyan Date: Tue, 11 Mar 2014 19:36:35 +0400 Subject: [PATCH] Replace Puka with Kombu Replace Puka messaging library with Kombu, since Kombu is available in global-requirements. Change-Id: I605b02e9ff65432c59d2aecb2d67c6fd9e5594bb --- muranocommon/messaging/message.py | 11 +-- muranocommon/messaging/mqclient.py | 92 +++++++++++++------------- muranocommon/messaging/subscription.py | 54 ++++++++++----- requirements.txt | 3 +- 4 files changed, 91 insertions(+), 69 deletions(-) diff --git a/muranocommon/messaging/message.py b/muranocommon/messaging/message.py index 67dc8a7..515efc7 100644 --- a/muranocommon/messaging/message.py +++ b/muranocommon/messaging/message.py @@ -20,14 +20,15 @@ log = logging.getLogger("murano-common.messaging") class Message(object): - def __init__(self, client=None, message_handle=None): - self._client = client + def __init__(self, connection=None, message_handle=None): + self._body = None + self._connection = connection self._message_handle = message_handle self.id = None if message_handle is None else \ - message_handle['headers'].get('message_id') + message_handle.headers.get('message_id') try: self.body = None if message_handle is None else \ - anyjson.loads(message_handle['body']) + anyjson.loads(message_handle.body) except ValueError as e: self.body = None log.exception(e) @@ -49,4 +50,4 @@ class Message(object): self._id = value or '' def ack(self): - self._client.basic_ack(self._message_handle) + self._message_handle.ack() diff --git a/muranocommon/messaging/mqclient.py b/muranocommon/messaging/mqclient.py index c43a64c..6a70174 100644 --- a/muranocommon/messaging/mqclient.py +++ b/muranocommon/messaging/mqclient.py @@ -13,30 +13,39 @@ # See the License for the specific language governing permissions and # limitations under the License. -from eventlet import patcher -puka = patcher.import_patched('puka') import anyjson +import logging +import ssl as ssl_module + +from eventlet import patcher +kombu = patcher.import_patched('kombu') from subscription import Subscription +log = logging.getLogger("murano-common.messaging") + + class MqClient(object): def __init__(self, login, password, host, port, virtual_host, ssl=False, ca_certs=None): - scheme = 'amqp:' if not ssl else 'amqps:' + ssl_params = None - ssl_parameters = None - if ssl: - ssl_parameters = puka.SslConnectionParameters() - ssl_parameters.ca_certs = ca_certs + if ssl is True: + ssl_params = { + 'ca_certs': ca_certs, + 'cert_reqs': ssl_module.CERT_REQUIRED + } - self._client = puka.Client('{0}//{1}:{2}@{3}:{4}/{5}'.format( - scheme, - login, - password, - host, - port, - virtual_host - ), ssl_parameters=ssl_parameters) + self._connection = kombu.Connection( + 'amqp://{0}:{1}@{2}:{3}/{4}'.format( + login, + password, + host, + port, + virtual_host + ), ssl=ssl_params + ) + self._channel = None self._connected = False def __enter__(self): @@ -48,19 +57,19 @@ class MqClient(object): return False def connect(self): - if not self._connected: - promise = self._client.connect() - if self._client.wait(promise, timeout=10) is not None: - self._connected = True + self._connection.connect() + self._channel = self._connection.channel() + self._connected = True def close(self): - if self._connected: - promise = self._client.close() - self._client.wait(promise) - self._connected = False + self._connection.close() + self._connected = False - def declare(self, queue, exchange=None, enable_ha=False, ttl=0): - queue_args = {} + def declare(self, queue, exchange='', enable_ha=False, ttl=0): + if not self._connected: + raise RuntimeError('Not connected to RabbitMQ') + + queue_arguments = {} if enable_ha is True: # To use mirrored queues feature in RabbitMQ 2.x # we need to declare this policy on the queue itself. @@ -68,39 +77,32 @@ class MqClient(object): # Warning: this option has no effect on RabbitMQ 3.X, # to enable mirrored queues feature in RabbitMQ 3.X, please # configure RabbitMQ. - queue_args['x-ha-policy'] = 'all' + queue_arguments['x-ha-policy'] = 'all' if ttl > 0: - queue_args['x-expires'] = ttl + queue_arguments['x-expires'] = ttl - promise = self._client.queue_declare( - str(queue), durable=True, arguments=queue_args - ) - self._client.wait(promise) + exchange = kombu.Exchange(exchange, type='direct', durable=True) + queue = kombu.Queue(queue, exchange, queue, durable=True, + queue_arguments=queue_arguments) + bound_queue = queue(self._connection) + bound_queue.declare() - if exchange: - promise = self._client.exchange_declare( - str(exchange), - durable=True) - self._client.wait(promise) - promise = self._client.queue_bind( - str(queue), str(exchange), routing_key=str(queue)) - self._client.wait(promise) - - def send(self, message, key, exchange='', timeout=None): + def send(self, message, key, exchange=''): if not self._connected: raise RuntimeError('Not connected to RabbitMQ') headers = {'message_id': str(message.id)} - promise = self._client.basic_publish( + producer = kombu.Producer(self._connection) + producer.publish( exchange=str(exchange), routing_key=str(key), body=anyjson.dumps(message.body), - headers=headers) - self._client.wait(promise, timeout=timeout) + headers=headers + ) def open(self, queue, prefetch_count=1): if not self._connected: raise RuntimeError('Not connected to RabbitMQ') - return Subscription(self._client, queue, prefetch_count) + return Subscription(self._connection, queue, prefetch_count) diff --git a/muranocommon/messaging/subscription.py b/muranocommon/messaging/subscription.py index c199d72..b42c0ae 100644 --- a/muranocommon/messaging/subscription.py +++ b/muranocommon/messaging/subscription.py @@ -13,33 +13,53 @@ # See the License for the specific language governing permissions and # limitations under the License. -from message import Message +import collections +import socket + +from eventlet import patcher +kombu = patcher.import_patched('kombu') +five = patcher.import_patched('kombu.five') +from . import message class Subscription(object): - def __init__(self, client, queue, prefetch_count=1): - self._client = client - self._queue = queue - self._promise = None - self._prefetch_count = prefetch_count + def __init__(self, connection, queue, prefetch_count=1): + self._buffer = collections.deque() + self._connection = connection + self._queue = kombu.Queue(name=queue, exchange=None) + self._consumer = kombu.Consumer(self._connection, auto_declare=False) + self._consumer.register_callback(self._receive) + self._consumer.qos(prefetch_count=prefetch_count) def __enter__(self): - self._promise = self._client.basic_consume( - queue=self._queue, - prefetch_count=self._prefetch_count) + self._consumer.add_queue(self._queue) + self._consumer.consume() return self def __exit__(self, exc_type, exc_val, exc_tb): - promise = self._client.basic_cancel(self._promise) - self._client.wait(promise) + if self._consumer is not None: + self._consumer.cancel() return False def get_message(self, timeout=None): - if not self._promise: - raise RuntimeError( - "Subscription object must be used within 'with' block") - msg_handle = self._client.wait(self._promise, timeout=timeout) + msg_handle = self._get(timeout=timeout) if msg_handle is None: return None - msg = Message(self._client, msg_handle) - return msg + return message.Message(self._connection, msg_handle) + + def _get(self, timeout=None): + elapsed = 0.0 + remaining = timeout + while True: + time_start = five.monotonic() + if self._buffer: + return self._buffer.pop() + try: + self._connection.drain_events(timeout=timeout and remaining) + except socket.timeout: + return None + elapsed += five.monotonic() - time_start + remaining = timeout and timeout - elapsed or None + + def _receive(self, message_data, message): + self._buffer.append(message) diff --git a/requirements.txt b/requirements.txt index c50f2f5..36776a4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,4 @@ pbr>=0.6,<1.0 anyjson>=0.3.3 eventlet>=0.13.0 - -http://github.com/istalker2/puka/releases/download/1.0.7d/puka-1.0.7d.tar.gz#egg=puka-1.0.7d +kombu>=2.4.8