Acknowledgements implementation
In order to make zmq driver implementation reliable support acknowledgements receiving from server side. Acknowledgements feature is supported only by DEALER/ROUTER publisher/consumer pair because other socket types don't support back-chatter. More pluggable publishers/consumers added (PUSH/PULL). Change-Id: I0d02394561c895575045668b43b4b7946f3a8239
This commit is contained in:
parent
1adf880a23
commit
eb7552bb00
@ -13,6 +13,7 @@
|
|||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
|
import uuid
|
||||||
|
|
||||||
from oslo_messaging._drivers.zmq_driver.client.publishers\
|
from oslo_messaging._drivers.zmq_driver.client.publishers\
|
||||||
import zmq_publisher_base
|
import zmq_publisher_base
|
||||||
@ -29,6 +30,7 @@ class DealerPublisher(zmq_publisher_base.PublisherMultisend):
|
|||||||
|
|
||||||
def __init__(self, conf, matchmaker):
|
def __init__(self, conf, matchmaker):
|
||||||
super(DealerPublisher, self).__init__(conf, matchmaker, zmq.DEALER)
|
super(DealerPublisher, self).__init__(conf, matchmaker, zmq.DEALER)
|
||||||
|
self.ack_receiver = AcknowledgementReceiver()
|
||||||
|
|
||||||
def send_request(self, request):
|
def send_request(self, request):
|
||||||
|
|
||||||
@ -37,6 +39,17 @@ class DealerPublisher(zmq_publisher_base.PublisherMultisend):
|
|||||||
|
|
||||||
dealer_socket, hosts = self._check_hosts_connections(request.target)
|
dealer_socket, hosts = self._check_hosts_connections(request.target)
|
||||||
|
|
||||||
|
if not dealer_socket.connections:
|
||||||
|
# NOTE(ozamiatin): Here we can provide
|
||||||
|
# a queue for keeping messages to send them later
|
||||||
|
# when some listener appears. However such approach
|
||||||
|
# being more reliable will consume additional memory.
|
||||||
|
LOG.warning(_LW("Request %s was dropped because no connection")
|
||||||
|
% request.msg_type)
|
||||||
|
return
|
||||||
|
|
||||||
|
self.ack_receiver.track_socket(dealer_socket.handle)
|
||||||
|
|
||||||
if request.msg_type in zmq_names.MULTISEND_TYPES:
|
if request.msg_type in zmq_names.MULTISEND_TYPES:
|
||||||
for _ in range(dealer_socket.connections_count()):
|
for _ in range(dealer_socket.connections_count()):
|
||||||
self._send_request(dealer_socket, request)
|
self._send_request(dealer_socket, request)
|
||||||
@ -45,18 +58,44 @@ class DealerPublisher(zmq_publisher_base.PublisherMultisend):
|
|||||||
|
|
||||||
def _send_request(self, socket, request):
|
def _send_request(self, socket, request):
|
||||||
|
|
||||||
if not socket.connections:
|
message_id = str(uuid.uuid1())
|
||||||
# NOTE(ozamiatin): Here we can provide
|
|
||||||
# a queue for keeping messages to send them later
|
|
||||||
# when some listener appears. However such approach
|
|
||||||
# being more reliable will consume additional memory.
|
|
||||||
LOG.warning(_LW("Request %s was dropped because no connection")
|
|
||||||
% request.msg_type)
|
|
||||||
return
|
|
||||||
|
|
||||||
socket.send(b'', zmq.SNDMORE)
|
socket.send(b'', zmq.SNDMORE)
|
||||||
super(DealerPublisher, self)._send_request(socket, request)
|
socket.send_string(request.msg_type, zmq.SNDMORE)
|
||||||
|
socket.send_string(message_id, zmq.SNDMORE)
|
||||||
|
socket.send_json(request.context, zmq.SNDMORE)
|
||||||
|
socket.send_json(request.message)
|
||||||
|
|
||||||
LOG.info(_LI("Sending message %(message)s to a target %(target)s")
|
LOG.info(_LI("Sending message %(message)s to a target %(target)s")
|
||||||
% {"message": request.message,
|
% {"message": request.message,
|
||||||
"target": request.target})
|
"target": request.target})
|
||||||
|
|
||||||
|
def cleanup(self):
|
||||||
|
self.ack_receiver.cleanup()
|
||||||
|
super(DealerPublisher, self).cleanup()
|
||||||
|
|
||||||
|
|
||||||
|
class AcknowledgementReceiver(object):
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self.poller = zmq_async.get_poller()
|
||||||
|
self.thread = zmq_async.get_executor(self.poll_for_acknowledgements)
|
||||||
|
self.thread.execute()
|
||||||
|
|
||||||
|
def _receive_acknowledgement(self, socket):
|
||||||
|
empty = socket.recv()
|
||||||
|
assert empty == b"", "Empty delimiter expected"
|
||||||
|
ack_message = socket.recv_json()
|
||||||
|
return ack_message
|
||||||
|
|
||||||
|
def track_socket(self, socket):
|
||||||
|
self.poller.register(socket, self._receive_acknowledgement)
|
||||||
|
|
||||||
|
def poll_for_acknowledgements(self):
|
||||||
|
ack_message, socket = self.poller.poll()
|
||||||
|
LOG.info(_LI("Message %s acknowledged")
|
||||||
|
% ack_message[zmq_names.FIELD_ID])
|
||||||
|
|
||||||
|
def cleanup(self):
|
||||||
|
self.thread.stop()
|
||||||
|
self.poller.close()
|
||||||
|
@ -0,0 +1,47 @@
|
|||||||
|
# Copyright 2015 Mirantis, Inc.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
import logging
|
||||||
|
|
||||||
|
from oslo_messaging._drivers.zmq_driver.client.publishers\
|
||||||
|
import zmq_publisher_base
|
||||||
|
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||||
|
from oslo_messaging._drivers.zmq_driver import zmq_names
|
||||||
|
from oslo_messaging._i18n import _LI
|
||||||
|
|
||||||
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
zmq = zmq_async.import_zmq()
|
||||||
|
|
||||||
|
|
||||||
|
class PubPublisher(zmq_publisher_base.PublisherMultisend):
|
||||||
|
|
||||||
|
def __init__(self, conf, matchmaker):
|
||||||
|
super(PubPublisher, self).__init__(conf, matchmaker, zmq.PUB)
|
||||||
|
|
||||||
|
def send_request(self, request):
|
||||||
|
|
||||||
|
if request.msg_type not in zmq_names.NOTIFY_TYPES:
|
||||||
|
raise zmq_publisher_base.UnsupportedSendPattern(request.msg_type)
|
||||||
|
|
||||||
|
pub_socket, hosts = self._check_hosts_connections(request.target)
|
||||||
|
self._send_request(pub_socket, request)
|
||||||
|
|
||||||
|
def _send_request(self, socket, request):
|
||||||
|
|
||||||
|
super(PubPublisher, self)._send_request(socket, request)
|
||||||
|
|
||||||
|
LOG.info(_LI("Publishing message %(message)s to a target %(target)s")
|
||||||
|
% {"message": request.message,
|
||||||
|
"target": request.target})
|
@ -110,8 +110,9 @@ class PublisherMultisend(PublisherBase):
|
|||||||
super(PublisherMultisend, self).__init__(conf, matchmaker)
|
super(PublisherMultisend, self).__init__(conf, matchmaker)
|
||||||
|
|
||||||
def _check_hosts_connections(self, target):
|
def _check_hosts_connections(self, target):
|
||||||
|
# TODO(ozamiatin): Place for significant optimization
|
||||||
|
# Matchmaker cache should be implemented
|
||||||
hosts = self.matchmaker.get_hosts(target)
|
hosts = self.matchmaker.get_hosts(target)
|
||||||
|
|
||||||
if str(target) in self.outbound_sockets:
|
if str(target) in self.outbound_sockets:
|
||||||
socket = self.outbound_sockets[str(target)]
|
socket = self.outbound_sockets[str(target)]
|
||||||
else:
|
else:
|
||||||
|
@ -0,0 +1,57 @@
|
|||||||
|
# Copyright 2015 Mirantis, Inc.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
import logging
|
||||||
|
|
||||||
|
from oslo_messaging._drivers.zmq_driver.client.publishers\
|
||||||
|
import zmq_publisher_base
|
||||||
|
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||||
|
from oslo_messaging._drivers.zmq_driver import zmq_names
|
||||||
|
from oslo_messaging._i18n import _LI, _LW
|
||||||
|
|
||||||
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
zmq = zmq_async.import_zmq()
|
||||||
|
|
||||||
|
|
||||||
|
class PushPublisher(zmq_publisher_base.PublisherMultisend):
|
||||||
|
|
||||||
|
def __init__(self, conf, matchmaker):
|
||||||
|
super(PushPublisher, self).__init__(conf, matchmaker, zmq.PUSH)
|
||||||
|
|
||||||
|
def send_request(self, request):
|
||||||
|
|
||||||
|
if request.msg_type == zmq_names.CALL_TYPE:
|
||||||
|
raise zmq_publisher_base.UnsupportedSendPattern(request.msg_type)
|
||||||
|
|
||||||
|
push_socket, hosts = self._check_hosts_connections(request.target)
|
||||||
|
|
||||||
|
if not push_socket.connections:
|
||||||
|
LOG.warning(_LW("Request %s was dropped because no connection")
|
||||||
|
% request.msg_type)
|
||||||
|
return
|
||||||
|
|
||||||
|
if request.msg_type in zmq_names.MULTISEND_TYPES:
|
||||||
|
for _ in range(push_socket.connections_count()):
|
||||||
|
self._send_request(push_socket, request)
|
||||||
|
else:
|
||||||
|
self._send_request(push_socket, request)
|
||||||
|
|
||||||
|
def _send_request(self, socket, request):
|
||||||
|
|
||||||
|
super(PushPublisher, self)._send_request(socket, request)
|
||||||
|
|
||||||
|
LOG.info(_LI("Publishing message %(message)s to a target %(target)s")
|
||||||
|
% {"message": request.message,
|
||||||
|
"target": request.target})
|
@ -14,7 +14,6 @@
|
|||||||
|
|
||||||
import contextlib
|
import contextlib
|
||||||
|
|
||||||
|
|
||||||
from oslo_messaging._drivers.zmq_driver.client.publishers\
|
from oslo_messaging._drivers.zmq_driver.client.publishers\
|
||||||
import zmq_dealer_publisher
|
import zmq_dealer_publisher
|
||||||
from oslo_messaging._drivers.zmq_driver.client.publishers\
|
from oslo_messaging._drivers.zmq_driver.client.publishers\
|
||||||
|
@ -0,0 +1,85 @@
|
|||||||
|
# Copyright 2015 Mirantis, Inc.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
import abc
|
||||||
|
import logging
|
||||||
|
|
||||||
|
import six
|
||||||
|
|
||||||
|
from oslo_messaging._drivers import common as rpc_common
|
||||||
|
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||||
|
from oslo_messaging._drivers.zmq_driver import zmq_socket
|
||||||
|
from oslo_messaging._i18n import _LE, _LI
|
||||||
|
|
||||||
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
zmq = zmq_async.import_zmq()
|
||||||
|
|
||||||
|
|
||||||
|
@six.add_metaclass(abc.ABCMeta)
|
||||||
|
class ConsumerBase(object):
|
||||||
|
|
||||||
|
def __init__(self, conf, poller, server):
|
||||||
|
self.conf = conf
|
||||||
|
self.poller = poller
|
||||||
|
self.server = server
|
||||||
|
self.sockets = []
|
||||||
|
self.context = zmq.Context()
|
||||||
|
|
||||||
|
def subscribe_socket(self, socket_type):
|
||||||
|
try:
|
||||||
|
socket = zmq_socket.ZmqRandomPortSocket(
|
||||||
|
self.conf, self.context, socket_type)
|
||||||
|
self.sockets.append(socket)
|
||||||
|
self.poller.register(socket, self.receive_message)
|
||||||
|
LOG.info(_LI("Run %(stype)s consumer on %(addr)s:%(port)d"),
|
||||||
|
{"stype": socket_type,
|
||||||
|
"addr": socket.bind_address,
|
||||||
|
"port": socket.port})
|
||||||
|
return socket
|
||||||
|
except zmq.ZMQError as e:
|
||||||
|
errmsg = _LE("Failed binding to port %(port)d: %(e)s")\
|
||||||
|
% (self.port, e)
|
||||||
|
LOG.error(errmsg)
|
||||||
|
raise rpc_common.RPCException(errmsg)
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def listen(self, target):
|
||||||
|
"""Associate new sockets with targets here"""
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def receive_message(self, target):
|
||||||
|
"""Method for poller - receiving message routine"""
|
||||||
|
|
||||||
|
def cleanup(self):
|
||||||
|
for socket in self.sockets:
|
||||||
|
if not socket.handle.closed:
|
||||||
|
socket.setsockopt(zmq.LINGER, 0)
|
||||||
|
socket.close()
|
||||||
|
self.sockets = []
|
||||||
|
|
||||||
|
|
||||||
|
class SingleSocketConsumer(ConsumerBase):
|
||||||
|
|
||||||
|
def __init__(self, conf, poller, server, socket_type):
|
||||||
|
super(SingleSocketConsumer, self).__init__(conf, poller, server)
|
||||||
|
self.socket = self.subscribe_socket(socket_type)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def address(self):
|
||||||
|
return self.socket.bind_address
|
||||||
|
|
||||||
|
@property
|
||||||
|
def port(self):
|
||||||
|
return self.socket.port
|
@ -0,0 +1,69 @@
|
|||||||
|
# Copyright 2015 Mirantis, Inc.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
import logging
|
||||||
|
|
||||||
|
from oslo_messaging._drivers import base
|
||||||
|
from oslo_messaging._drivers.zmq_driver.server.consumers\
|
||||||
|
import zmq_consumer_base
|
||||||
|
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||||
|
from oslo_messaging._drivers.zmq_driver import zmq_names
|
||||||
|
from oslo_messaging._i18n import _LE, _LI
|
||||||
|
|
||||||
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
zmq = zmq_async.import_zmq()
|
||||||
|
|
||||||
|
|
||||||
|
class PullIncomingMessage(base.IncomingMessage):
|
||||||
|
|
||||||
|
def __init__(self, listener, context, message):
|
||||||
|
super(PullIncomingMessage, self).__init__(listener, context, message)
|
||||||
|
|
||||||
|
def reply(self, reply=None, failure=None, log_failure=True):
|
||||||
|
"""Reply is not needed for non-call messages."""
|
||||||
|
|
||||||
|
def acknowledge(self):
|
||||||
|
"""Acknowledgments are not supported by this type of consumer."""
|
||||||
|
|
||||||
|
def requeue(self):
|
||||||
|
"""Requeueing is not supported."""
|
||||||
|
|
||||||
|
|
||||||
|
class PullConsumer(zmq_consumer_base.SingleSocketConsumer):
|
||||||
|
|
||||||
|
def __init__(self, conf, poller, server):
|
||||||
|
super(PullConsumer, self).__init__(conf, poller, server, zmq.PULL)
|
||||||
|
|
||||||
|
def listen(self, target):
|
||||||
|
LOG.info(_LI("Listen to target %s") % str(target))
|
||||||
|
# Do nothing here because we have a single socket
|
||||||
|
|
||||||
|
def receive_message(self, socket):
|
||||||
|
try:
|
||||||
|
msg_type = socket.recv_string()
|
||||||
|
assert msg_type is not None, 'Bad format: msg type expected'
|
||||||
|
context = socket.recv_json()
|
||||||
|
message = socket.recv_json()
|
||||||
|
LOG.info(_LI("Received %(msg_type)s message %(msg)s")
|
||||||
|
% {"msg_type": msg_type,
|
||||||
|
"msg": str(message)})
|
||||||
|
|
||||||
|
if msg_type in (zmq_names.CAST_TYPES + zmq_names.NOTIFY_TYPES):
|
||||||
|
return PullIncomingMessage(self.server, context, message)
|
||||||
|
else:
|
||||||
|
LOG.error(_LE("Unknown message type: %s") % msg_type)
|
||||||
|
|
||||||
|
except zmq.ZMQError as e:
|
||||||
|
LOG.error(_LE("Receiving message failed: %s") % str(e))
|
@ -14,9 +14,10 @@
|
|||||||
|
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
from oslo_messaging._drivers import common as rpc_common
|
from oslo_messaging._drivers import base
|
||||||
|
from oslo_messaging._drivers.zmq_driver.server.consumers\
|
||||||
|
import zmq_consumer_base
|
||||||
from oslo_messaging._drivers.zmq_driver.server import zmq_incoming_message
|
from oslo_messaging._drivers.zmq_driver.server import zmq_incoming_message
|
||||||
from oslo_messaging._drivers.zmq_driver import zmq_address
|
|
||||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||||
from oslo_messaging._drivers.zmq_driver import zmq_names
|
from oslo_messaging._drivers.zmq_driver import zmq_names
|
||||||
from oslo_messaging._i18n import _LE, _LI
|
from oslo_messaging._i18n import _LE, _LI
|
||||||
@ -26,46 +27,52 @@ LOG = logging.getLogger(__name__)
|
|||||||
zmq = zmq_async.import_zmq()
|
zmq = zmq_async.import_zmq()
|
||||||
|
|
||||||
|
|
||||||
class RouterConsumer(object):
|
class RouterIncomingMessage(base.IncomingMessage):
|
||||||
|
|
||||||
|
def __init__(self, listener, context, message, socket, reply_id, msg_id,
|
||||||
|
poller):
|
||||||
|
super(RouterIncomingMessage, self).__init__(listener, context, message)
|
||||||
|
self.socket = socket
|
||||||
|
self.reply_id = reply_id
|
||||||
|
self.msg_id = msg_id
|
||||||
|
self.message = message
|
||||||
|
poller.resume_polling(socket)
|
||||||
|
|
||||||
|
def reply(self, reply=None, failure=None, log_failure=True):
|
||||||
|
"""Reply is not needed for non-call messages"""
|
||||||
|
|
||||||
|
def acknowledge(self):
|
||||||
|
LOG.info("Sending acknowledge for %s", self.msg_id)
|
||||||
|
ack_message = {zmq_names.FIELD_ID: self.msg_id}
|
||||||
|
self.socket.send(self.reply_id, zmq.SNDMORE)
|
||||||
|
self.socket.send(b'', zmq.SNDMORE)
|
||||||
|
self.socket.send_json(ack_message)
|
||||||
|
|
||||||
|
def requeue(self):
|
||||||
|
"""Requeue is not supported"""
|
||||||
|
|
||||||
|
|
||||||
|
class RouterConsumer(zmq_consumer_base.SingleSocketConsumer):
|
||||||
|
|
||||||
def __init__(self, conf, poller, server):
|
def __init__(self, conf, poller, server):
|
||||||
|
super(RouterConsumer, self).__init__(conf, poller, server, zmq.ROUTER)
|
||||||
self.conf = conf
|
|
||||||
self.poller = poller
|
|
||||||
self.server = server
|
|
||||||
|
|
||||||
try:
|
|
||||||
self.context = zmq.Context()
|
|
||||||
self.socket = self.context.socket(zmq.ROUTER)
|
|
||||||
self.address = zmq_address.get_tcp_random_address(conf)
|
|
||||||
self.port = self.socket.bind_to_random_port(self.address)
|
|
||||||
self.poller.register(self.socket, self._receive_message)
|
|
||||||
LOG.info(_LI("Run ROUTER consumer on %(addr)s:%(port)d"),
|
|
||||||
{"addr": self.address,
|
|
||||||
"port": self.port})
|
|
||||||
except zmq.ZMQError as e:
|
|
||||||
errmsg = _LE("Failed binding to port %(port)d: %(e)s")\
|
|
||||||
% (self.port, e)
|
|
||||||
LOG.error(errmsg)
|
|
||||||
raise rpc_common.RPCException(errmsg)
|
|
||||||
|
|
||||||
def listen(self, target):
|
def listen(self, target):
|
||||||
LOG.info(_LI("Listen to target %s") % str(target))
|
LOG.info(_LI("Listen to target %s") % str(target))
|
||||||
# Do nothing here because we have single socket
|
# Do nothing here because we have a single socket
|
||||||
|
|
||||||
def cleanup(self):
|
|
||||||
if not self.socket.closed:
|
|
||||||
self.socket.setsockopt(zmq.LINGER, 0)
|
|
||||||
self.socket.close()
|
|
||||||
|
|
||||||
def _receive_message(self, socket):
|
|
||||||
|
|
||||||
|
def receive_message(self, socket):
|
||||||
try:
|
try:
|
||||||
reply_id = socket.recv()
|
reply_id = socket.recv()
|
||||||
empty = socket.recv()
|
empty = socket.recv()
|
||||||
assert empty == b'', 'Bad format: empty delimiter expected'
|
assert empty == b'', 'Bad format: empty delimiter expected'
|
||||||
msg_type = socket.recv_string()
|
msg_type = socket.recv_string()
|
||||||
assert msg_type is not None, 'Bad format: msg type expected'
|
assert msg_type is not None, 'Bad format: msg type expected'
|
||||||
|
|
||||||
|
msg_id = None
|
||||||
|
if msg_type != zmq_names.CALL_TYPE:
|
||||||
|
msg_id = socket.recv_string()
|
||||||
|
|
||||||
context = socket.recv_json()
|
context = socket.recv_json()
|
||||||
message = socket.recv_json()
|
message = socket.recv_json()
|
||||||
LOG.info(_LI("Received %(msg_type)s message %(msg)s")
|
LOG.info(_LI("Received %(msg_type)s message %(msg)s")
|
||||||
@ -76,12 +83,10 @@ class RouterConsumer(object):
|
|||||||
return zmq_incoming_message.ZmqIncomingRequest(
|
return zmq_incoming_message.ZmqIncomingRequest(
|
||||||
self.server, context, message, socket, reply_id,
|
self.server, context, message, socket, reply_id,
|
||||||
self.poller)
|
self.poller)
|
||||||
elif msg_type in zmq_names.CAST_TYPES:
|
elif msg_type in (zmq_names.CAST_TYPES + zmq_names.NOTIFY_TYPES):
|
||||||
return zmq_incoming_message.ZmqCastMessage(
|
return RouterIncomingMessage(
|
||||||
self.server, context, message, socket, self.poller)
|
self.server, context, message, socket, reply_id,
|
||||||
elif msg_type in zmq_names.NOTIFY_TYPES:
|
msg_id, self.poller)
|
||||||
return zmq_incoming_message.ZmqNotificationMessage(
|
|
||||||
self.server, context, message, socket, self.poller)
|
|
||||||
else:
|
else:
|
||||||
LOG.error(_LE("Unknown message type: %s") % msg_type)
|
LOG.error(_LE("Unknown message type: %s") % msg_type)
|
||||||
|
|
||||||
|
@ -42,48 +42,14 @@ class ZmqIncomingRequest(base.IncomingMessage):
|
|||||||
message_reply = {zmq_names.FIELD_REPLY: reply,
|
message_reply = {zmq_names.FIELD_REPLY: reply,
|
||||||
zmq_names.FIELD_FAILURE: failure,
|
zmq_names.FIELD_FAILURE: failure,
|
||||||
zmq_names.FIELD_LOG_FAILURE: log_failure}
|
zmq_names.FIELD_LOG_FAILURE: log_failure}
|
||||||
LOG.debug("Replying %s REP", (str(message_reply)))
|
|
||||||
|
LOG.info("Replying %s REP", (str(message_reply)))
|
||||||
|
|
||||||
self.received = True
|
self.received = True
|
||||||
self.reply_socket.send(self.reply_id, zmq.SNDMORE)
|
self.reply_socket.send(self.reply_id, zmq.SNDMORE)
|
||||||
self.reply_socket.send(b'', zmq.SNDMORE)
|
self.reply_socket.send(b'', zmq.SNDMORE)
|
||||||
self.reply_socket.send_json(message_reply)
|
self.reply_socket.send_json(message_reply)
|
||||||
self.poller.resume_polling(self.reply_socket)
|
self.poller.resume_polling(self.reply_socket)
|
||||||
|
|
||||||
def acknowledge(self):
|
|
||||||
pass
|
|
||||||
|
|
||||||
def requeue(self):
|
def requeue(self):
|
||||||
pass
|
"""Requeue is not supported"""
|
||||||
|
|
||||||
|
|
||||||
class ZmqCastMessage(base.IncomingMessage):
|
|
||||||
|
|
||||||
def __init__(self, listener, context, message, socket, poller):
|
|
||||||
super(ZmqCastMessage, self).__init__(listener, context, message)
|
|
||||||
poller.resume_polling(socket)
|
|
||||||
|
|
||||||
def reply(self, reply=None, failure=None, log_failure=True):
|
|
||||||
"""Reply is not needed for fanout(cast) messages"""
|
|
||||||
|
|
||||||
def acknowledge(self):
|
|
||||||
pass
|
|
||||||
|
|
||||||
def requeue(self):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class ZmqNotificationMessage(base.IncomingMessage):
|
|
||||||
|
|
||||||
def __init__(self, listener, context, message, socket, poller):
|
|
||||||
super(ZmqNotificationMessage, self).__init__(listener, context,
|
|
||||||
message)
|
|
||||||
poller.resume_polling(socket)
|
|
||||||
|
|
||||||
def reply(self, reply=None, failure=None, log_failure=True):
|
|
||||||
"""Reply is not needed for notification messages"""
|
|
||||||
|
|
||||||
def acknowledge(self):
|
|
||||||
pass
|
|
||||||
|
|
||||||
def requeue(self):
|
|
||||||
pass
|
|
||||||
|
@ -19,6 +19,8 @@ zmq = zmq_async.import_zmq()
|
|||||||
|
|
||||||
ZMQ_SOCKET_STR = {zmq.DEALER: "DEALER",
|
ZMQ_SOCKET_STR = {zmq.DEALER: "DEALER",
|
||||||
zmq.ROUTER: "ROUTER",
|
zmq.ROUTER: "ROUTER",
|
||||||
|
zmq.PUSH: "PUSH",
|
||||||
|
zmq.PULL: "PULL",
|
||||||
zmq.REQ: "REQ",
|
zmq.REQ: "REQ",
|
||||||
zmq.REP: "REP",
|
zmq.REP: "REP",
|
||||||
zmq.PUB: "PUB",
|
zmq.PUB: "PUB",
|
||||||
@ -27,6 +29,7 @@ ZMQ_SOCKET_STR = {zmq.DEALER: "DEALER",
|
|||||||
FIELD_FAILURE = 'failure'
|
FIELD_FAILURE = 'failure'
|
||||||
FIELD_REPLY = 'reply'
|
FIELD_REPLY = 'reply'
|
||||||
FIELD_LOG_FAILURE = 'log_failure'
|
FIELD_LOG_FAILURE = 'log_failure'
|
||||||
|
FIELD_ID = 'id'
|
||||||
|
|
||||||
CALL_TYPE = 'call'
|
CALL_TYPE = 'call'
|
||||||
CAST_TYPE = 'cast'
|
CAST_TYPE = 'cast'
|
||||||
|
@ -14,6 +14,7 @@
|
|||||||
|
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
|
from oslo_messaging._drivers.zmq_driver import zmq_address
|
||||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||||
from oslo_messaging._drivers.zmq_driver import zmq_names
|
from oslo_messaging._drivers.zmq_driver import zmq_names
|
||||||
|
|
||||||
@ -31,7 +32,7 @@ class ZmqSocket(object):
|
|||||||
self.connections = set()
|
self.connections = set()
|
||||||
|
|
||||||
def type_name(self):
|
def type_name(self):
|
||||||
return zmq_names(self.socket_type)
|
return zmq_names.socket_type_str(self.socket_type)
|
||||||
|
|
||||||
def connections_count(self):
|
def connections_count(self):
|
||||||
return len(self.connections)
|
return len(self.connections)
|
||||||
@ -53,5 +54,23 @@ class ZmqSocket(object):
|
|||||||
def send_json(self, *args, **kwargs):
|
def send_json(self, *args, **kwargs):
|
||||||
self.handle.send_json(*args, **kwargs)
|
self.handle.send_json(*args, **kwargs)
|
||||||
|
|
||||||
|
def recv(self, *args, **kwargs):
|
||||||
|
return self.handle.recv(*args, **kwargs)
|
||||||
|
|
||||||
|
def recv_string(self, *args, **kwargs):
|
||||||
|
return self.handle.recv_string(*args, **kwargs)
|
||||||
|
|
||||||
|
def recv_json(self, *args, **kwargs):
|
||||||
|
return self.handle.recv_json(*args, **kwargs)
|
||||||
|
|
||||||
def close(self, *args, **kwargs):
|
def close(self, *args, **kwargs):
|
||||||
self.handle.close(*args, **kwargs)
|
self.handle.close(*args, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
|
class ZmqRandomPortSocket(ZmqSocket):
|
||||||
|
|
||||||
|
def __init__(self, conf, context, socket_type):
|
||||||
|
super(ZmqRandomPortSocket, self).__init__(context, socket_type)
|
||||||
|
self.conf = conf
|
||||||
|
self.bind_address = zmq_address.get_tcp_random_address(self.conf)
|
||||||
|
self.port = self.handle.bind_to_random_port(self.bind_address)
|
||||||
|
@ -52,6 +52,7 @@ class TestServerListener(object):
|
|||||||
try:
|
try:
|
||||||
message = self.listener.poll()
|
message = self.listener.poll()
|
||||||
if message is not None:
|
if message is not None:
|
||||||
|
message.acknowledge()
|
||||||
self._received.set()
|
self._received.set()
|
||||||
self.message = message
|
self.message = message
|
||||||
message.reply(reply=True)
|
message.reply(reply=True)
|
||||||
@ -188,7 +189,7 @@ class TestZmqBasics(ZmqBaseTestCase):
|
|||||||
context = {}
|
context = {}
|
||||||
target.topic = target.topic + '.info'
|
target.topic = target.topic + '.info'
|
||||||
self.driver.send_notification(target, context, message, '3.0')
|
self.driver.send_notification(target, context, message, '3.0')
|
||||||
self.listener._received.wait()
|
self.listener._received.wait(5)
|
||||||
self.assertTrue(self.listener._received.isSet())
|
self.assertTrue(self.listener._received.isSet())
|
||||||
|
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user