Notifier implementation
Notifier implementation for zmq driver (ROUTER/DEALER variant). Publishers/consumers refactoring in order to make them pluggable. Change-Id: I2dd42cc805aa72b929a4dfa17498cd8b9c0ed7af
This commit is contained in:
parent
64831f29ee
commit
141f59bd9b
@ -111,7 +111,7 @@ class BaseDriver(object):
|
|||||||
"""Construct a Listener for the given target."""
|
"""Construct a Listener for the given target."""
|
||||||
|
|
||||||
@abc.abstractmethod
|
@abc.abstractmethod
|
||||||
def listen_for_notifications(self, targets_and_priorities):
|
def listen_for_notifications(self, targets_and_priorities, pool):
|
||||||
"""Construct a notification Listener for the given list of
|
"""Construct a notification Listener for the given list of
|
||||||
tuple of (target, priority).
|
tuple of (target, priority).
|
||||||
"""
|
"""
|
||||||
|
@ -21,8 +21,8 @@ from stevedore import driver
|
|||||||
|
|
||||||
from oslo_messaging._drivers import base
|
from oslo_messaging._drivers import base
|
||||||
from oslo_messaging._drivers import common as rpc_common
|
from oslo_messaging._drivers import common as rpc_common
|
||||||
from oslo_messaging._drivers.zmq_driver.rpc.client import zmq_client
|
from oslo_messaging._drivers.zmq_driver.client import zmq_client
|
||||||
from oslo_messaging._drivers.zmq_driver.rpc.server import zmq_server
|
from oslo_messaging._drivers.zmq_driver.server import zmq_server
|
||||||
from oslo_messaging._executors import base as executor_base
|
from oslo_messaging._executors import base as executor_base
|
||||||
|
|
||||||
|
|
||||||
@ -108,21 +108,28 @@ class ZmqDriver(base.BaseDriver):
|
|||||||
|
|
||||||
def send(self, target, ctxt, message, wait_for_reply=None, timeout=None,
|
def send(self, target, ctxt, message, wait_for_reply=None, timeout=None,
|
||||||
retry=None):
|
retry=None):
|
||||||
|
timeout = timeout or self.conf.rpc_response_timeout
|
||||||
if wait_for_reply:
|
if wait_for_reply:
|
||||||
return self.client.call(target, ctxt, message, timeout, retry)
|
return self.client.send_call(target, ctxt, message, timeout, retry)
|
||||||
|
elif target.fanout:
|
||||||
|
self.client.send_fanout(target, ctxt, message, timeout, retry)
|
||||||
else:
|
else:
|
||||||
self.client.cast(target, ctxt, message, timeout, retry)
|
self.client.send_cast(target, ctxt, message, timeout, retry)
|
||||||
return None
|
|
||||||
|
|
||||||
def send_notification(self, target, ctxt, message, version, retry=None):
|
def send_notification(self, target, ctxt, message, version, retry=None):
|
||||||
return None
|
if target.fanout:
|
||||||
|
self.client.send_notify_fanout(target, ctxt, message, version,
|
||||||
|
retry)
|
||||||
|
else:
|
||||||
|
self.client.send_notify(target, ctxt, message, version, retry)
|
||||||
|
|
||||||
def listen(self, target):
|
def listen(self, target):
|
||||||
self.server.listen(target)
|
self.server.listen(target)
|
||||||
return self.server
|
return self.server
|
||||||
|
|
||||||
def listen_for_notifications(self, targets_and_priorities, pool):
|
def listen_for_notifications(self, targets_and_priorities, pool):
|
||||||
return None
|
self.server.listen_notification(targets_and_priorities)
|
||||||
|
return self.server
|
||||||
|
|
||||||
def cleanup(self):
|
def cleanup(self):
|
||||||
self.client.cleanup()
|
self.client.cleanup()
|
||||||
|
@ -0,0 +1,77 @@
|
|||||||
|
# Copyright 2015 Mirantis, Inc.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
import logging
|
||||||
|
|
||||||
|
from oslo_messaging._drivers import common as rpc_common
|
||||||
|
from oslo_messaging._drivers.zmq_driver.client.publishers\
|
||||||
|
import zmq_publisher_base
|
||||||
|
from oslo_messaging._drivers.zmq_driver import zmq_address
|
||||||
|
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||||
|
from oslo_messaging._drivers.zmq_driver import zmq_names
|
||||||
|
from oslo_messaging._i18n import _LE, _LI
|
||||||
|
|
||||||
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
zmq = zmq_async.import_zmq()
|
||||||
|
|
||||||
|
|
||||||
|
class DealerPublisher(zmq_publisher_base.PublisherBase):
|
||||||
|
|
||||||
|
def send_request(self, request):
|
||||||
|
|
||||||
|
if request.msg_type == zmq_names.CALL_TYPE:
|
||||||
|
raise zmq_publisher_base.UnsupportedSendPattern(request.msg_type)
|
||||||
|
|
||||||
|
dealer_socket, hosts = self._check_hosts_connections(request.target)
|
||||||
|
|
||||||
|
if request.msg_type in zmq_names.MULTISEND_TYPES:
|
||||||
|
for _ in range(len(hosts)):
|
||||||
|
self._send_request(dealer_socket, request)
|
||||||
|
else:
|
||||||
|
self._send_request(dealer_socket, request)
|
||||||
|
|
||||||
|
def _send_request(self, socket, request):
|
||||||
|
|
||||||
|
socket.send(b'', zmq.SNDMORE)
|
||||||
|
super(DealerPublisher, self)._send_request(socket, request)
|
||||||
|
|
||||||
|
LOG.info(_LI("Sending message %(message)s to a target %(target)s")
|
||||||
|
% {"message": request.message,
|
||||||
|
"target": request.target})
|
||||||
|
|
||||||
|
def _check_hosts_connections(self, target):
|
||||||
|
if str(target) in self.outbound_sockets:
|
||||||
|
dealer_socket, hosts = self.outbound_sockets[str(target)]
|
||||||
|
else:
|
||||||
|
dealer_socket = zmq.Context().socket(zmq.DEALER)
|
||||||
|
hosts = self.matchmaker.get_hosts(target)
|
||||||
|
for host in hosts:
|
||||||
|
self._connect_to_host(dealer_socket, host, target)
|
||||||
|
self.outbound_sockets[str(target)] = (dealer_socket, hosts)
|
||||||
|
return dealer_socket, hosts
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _connect_to_host(socket, host, target):
|
||||||
|
address = zmq_address.get_tcp_direct_address(host)
|
||||||
|
try:
|
||||||
|
LOG.info(_LI("Connecting DEALER to %(address)s for %(target)s")
|
||||||
|
% {"address": address,
|
||||||
|
"target": target})
|
||||||
|
socket.connect(address)
|
||||||
|
except zmq.ZMQError as e:
|
||||||
|
errmsg = _LE("Failed connecting DEALER to %(address)s: %(e)s")\
|
||||||
|
% (address, e)
|
||||||
|
LOG.error(errmsg)
|
||||||
|
raise rpc_common.RPCException(errmsg)
|
@ -0,0 +1,56 @@
|
|||||||
|
# Copyright 2015 Mirantis, Inc.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
import abc
|
||||||
|
|
||||||
|
import six
|
||||||
|
|
||||||
|
from oslo_messaging._drivers import common as rpc_common
|
||||||
|
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||||
|
from oslo_messaging._i18n import _LE
|
||||||
|
|
||||||
|
|
||||||
|
zmq = zmq_async.import_zmq()
|
||||||
|
|
||||||
|
|
||||||
|
class UnsupportedSendPattern(rpc_common.RPCException):
|
||||||
|
|
||||||
|
def __init__(self, pattern_name):
|
||||||
|
errmsg = _LE("Sending pattern %s is unsupported.") % pattern_name
|
||||||
|
super(UnsupportedSendPattern, self).__init__(errmsg)
|
||||||
|
|
||||||
|
|
||||||
|
@six.add_metaclass(abc.ABCMeta)
|
||||||
|
class PublisherBase(object):
|
||||||
|
|
||||||
|
def __init__(self, conf, matchmaker):
|
||||||
|
self.conf = conf
|
||||||
|
self.zmq_context = zmq.Context()
|
||||||
|
self.matchmaker = matchmaker
|
||||||
|
self.outbound_sockets = {}
|
||||||
|
super(PublisherBase, self).__init__()
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def send_request(self, request):
|
||||||
|
"""Send request to consumer"""
|
||||||
|
|
||||||
|
def _send_request(self, socket, request):
|
||||||
|
socket.send_string(request.msg_type, zmq.SNDMORE)
|
||||||
|
socket.send_json(request.context, zmq.SNDMORE)
|
||||||
|
socket.send_json(request.message)
|
||||||
|
|
||||||
|
def cleanup(self):
|
||||||
|
for socket, hosts in self.outbound_sockets.values():
|
||||||
|
socket.setsockopt(zmq.LINGER, 0)
|
||||||
|
socket.close()
|
@ -0,0 +1,85 @@
|
|||||||
|
# Copyright 2015 Mirantis, Inc.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
import contextlib
|
||||||
|
import logging
|
||||||
|
|
||||||
|
import oslo_messaging
|
||||||
|
from oslo_messaging._drivers import common as rpc_common
|
||||||
|
from oslo_messaging._drivers.zmq_driver.client.publishers\
|
||||||
|
import zmq_publisher_base
|
||||||
|
from oslo_messaging._drivers.zmq_driver import zmq_address
|
||||||
|
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||||
|
from oslo_messaging._drivers.zmq_driver import zmq_names
|
||||||
|
from oslo_messaging._i18n import _LE, _LI
|
||||||
|
|
||||||
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
zmq = zmq_async.import_zmq()
|
||||||
|
|
||||||
|
|
||||||
|
class ReqPublisher(zmq_publisher_base.PublisherBase):
|
||||||
|
|
||||||
|
def send_request(self, request):
|
||||||
|
|
||||||
|
if request.msg_type != zmq_names.CALL_TYPE:
|
||||||
|
raise zmq_publisher_base.UnsupportedSendPattern(request.msg_type)
|
||||||
|
|
||||||
|
socket = self._connect_to_host(request.target)
|
||||||
|
self._send_request(socket, request)
|
||||||
|
return self._receive_reply(socket, request)
|
||||||
|
|
||||||
|
def _connect_to_host(self, target):
|
||||||
|
|
||||||
|
try:
|
||||||
|
self.zmq_context = zmq.Context()
|
||||||
|
socket = self.zmq_context.socket(zmq.REQ)
|
||||||
|
|
||||||
|
host = self.matchmaker.get_single_host(target)
|
||||||
|
connect_address = zmq_address.get_tcp_direct_address(host)
|
||||||
|
|
||||||
|
LOG.info(_LI("Connecting REQ to %s") % connect_address)
|
||||||
|
|
||||||
|
socket.connect(connect_address)
|
||||||
|
self.outbound_sockets[str(target)] = (socket, [host])
|
||||||
|
return socket
|
||||||
|
|
||||||
|
except zmq.ZMQError as e:
|
||||||
|
errmsg = _LE("Error connecting to socket: %s") % str(e)
|
||||||
|
LOG.error(errmsg)
|
||||||
|
raise rpc_common.RPCException(errmsg)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _receive_reply(socket, request):
|
||||||
|
|
||||||
|
def _receive_method(socket):
|
||||||
|
return socket.recv_json()
|
||||||
|
|
||||||
|
# NOTE(ozamiatin): Check for retry here (no retries now)
|
||||||
|
with contextlib.closing(zmq_async.get_reply_poller()) as poller:
|
||||||
|
poller.register(socket, recv_method=_receive_method)
|
||||||
|
reply, socket = poller.poll(timeout=request.timeout)
|
||||||
|
if reply is None:
|
||||||
|
raise oslo_messaging.MessagingTimeout(
|
||||||
|
"Timeout %s seconds was reached" % request.timeout)
|
||||||
|
if reply[zmq_names.FIELD_FAILURE]:
|
||||||
|
raise rpc_common.deserialize_remote_exception(
|
||||||
|
reply[zmq_names.FIELD_FAILURE],
|
||||||
|
request.allowed_remote_exmods)
|
||||||
|
else:
|
||||||
|
return reply[zmq_names.FIELD_REPLY]
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
# For contextlib compatibility
|
||||||
|
self.cleanup()
|
73
oslo_messaging/_drivers/zmq_driver/client/zmq_client.py
Normal file
73
oslo_messaging/_drivers/zmq_driver/client/zmq_client.py
Normal file
@ -0,0 +1,73 @@
|
|||||||
|
# Copyright 2015 Mirantis, Inc.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
import contextlib
|
||||||
|
|
||||||
|
|
||||||
|
from oslo_messaging._drivers.zmq_driver.client.publishers\
|
||||||
|
import zmq_dealer_publisher
|
||||||
|
from oslo_messaging._drivers.zmq_driver.client.publishers\
|
||||||
|
import zmq_req_publisher
|
||||||
|
from oslo_messaging._drivers.zmq_driver.client import zmq_request
|
||||||
|
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||||
|
|
||||||
|
zmq = zmq_async.import_zmq()
|
||||||
|
|
||||||
|
|
||||||
|
class ZmqClient(object):
|
||||||
|
|
||||||
|
def __init__(self, conf, matchmaker=None, allowed_remote_exmods=None):
|
||||||
|
self.conf = conf
|
||||||
|
self.context = zmq.Context()
|
||||||
|
self.matchmaker = matchmaker
|
||||||
|
self.allowed_remote_exmods = allowed_remote_exmods or []
|
||||||
|
self.dealer_publisher = zmq_dealer_publisher.DealerPublisher(
|
||||||
|
conf, matchmaker)
|
||||||
|
|
||||||
|
def send_call(self, target, context, message, timeout=None, retry=None):
|
||||||
|
with contextlib.closing(zmq_request.CallRequest(
|
||||||
|
target, context=context, message=message,
|
||||||
|
timeout=timeout, retry=retry,
|
||||||
|
allowed_remote_exmods=self.allowed_remote_exmods)) as request:
|
||||||
|
with contextlib.closing(zmq_req_publisher.ReqPublisher(
|
||||||
|
self.conf, self.matchmaker)) as req_publisher:
|
||||||
|
return req_publisher.send_request(request)
|
||||||
|
|
||||||
|
def send_cast(self, target, context, message, timeout=None, retry=None):
|
||||||
|
with contextlib.closing(zmq_request.CastRequest(
|
||||||
|
target, context=context, message=message,
|
||||||
|
timeout=timeout, retry=retry)) as request:
|
||||||
|
self.dealer_publisher.send_request(request)
|
||||||
|
|
||||||
|
def send_fanout(self, target, context, message, timeout=None, retry=None):
|
||||||
|
with contextlib.closing(zmq_request.FanoutRequest(
|
||||||
|
target, context=context, message=message,
|
||||||
|
timeout=timeout, retry=retry)) as request:
|
||||||
|
self.dealer_publisher.send_request(request)
|
||||||
|
|
||||||
|
def send_notify(self, target, context, message, version, retry=None):
|
||||||
|
with contextlib.closing(zmq_request.NotificationRequest(
|
||||||
|
target, context, message, version=version,
|
||||||
|
retry=retry)) as request:
|
||||||
|
self.dealer_publisher.send_request(request)
|
||||||
|
|
||||||
|
def send_notify_fanout(self, target, context, message, version,
|
||||||
|
retry=None):
|
||||||
|
with contextlib.closing(zmq_request.NotificationFanoutRequest(
|
||||||
|
target, context, message, version=version,
|
||||||
|
retry=retry)) as request:
|
||||||
|
self.dealer_publisher.send_request(request)
|
||||||
|
|
||||||
|
def cleanup(self):
|
||||||
|
self.dealer_publisher.cleanup()
|
95
oslo_messaging/_drivers/zmq_driver/client/zmq_request.py
Normal file
95
oslo_messaging/_drivers/zmq_driver/client/zmq_request.py
Normal file
@ -0,0 +1,95 @@
|
|||||||
|
# Copyright 2015 Mirantis, Inc.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
import abc
|
||||||
|
import logging
|
||||||
|
|
||||||
|
import six
|
||||||
|
|
||||||
|
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||||
|
from oslo_messaging._drivers.zmq_driver import zmq_names
|
||||||
|
from oslo_messaging._i18n import _LE
|
||||||
|
|
||||||
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
zmq = zmq_async.import_zmq()
|
||||||
|
|
||||||
|
|
||||||
|
@six.add_metaclass(abc.ABCMeta)
|
||||||
|
class Request(object):
|
||||||
|
|
||||||
|
def __init__(self, target, context=None, message=None, retry=None):
|
||||||
|
|
||||||
|
if self.msg_type not in zmq_names.MESSAGE_TYPES:
|
||||||
|
raise RuntimeError("Unknown message type!")
|
||||||
|
|
||||||
|
self.target = target
|
||||||
|
self.context = context
|
||||||
|
self.message = message
|
||||||
|
self.retry = retry
|
||||||
|
|
||||||
|
@abc.abstractproperty
|
||||||
|
def msg_type(self):
|
||||||
|
"""ZMQ message type"""
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
"""Nothing to close in base request"""
|
||||||
|
|
||||||
|
|
||||||
|
class RpcRequest(Request):
|
||||||
|
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
message = kwargs.get("message")
|
||||||
|
if message['method'] is None:
|
||||||
|
errmsg = _LE("No method specified for RPC call")
|
||||||
|
LOG.error(errmsg)
|
||||||
|
raise KeyError(errmsg)
|
||||||
|
|
||||||
|
self.timeout = kwargs.pop("timeout")
|
||||||
|
assert self.timeout is not None, "Timeout should be specified!"
|
||||||
|
|
||||||
|
super(RpcRequest, self).__init__(*args, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
|
class CallRequest(RpcRequest):
|
||||||
|
|
||||||
|
msg_type = zmq_names.CALL_TYPE
|
||||||
|
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
self.allowed_remote_exmods = kwargs.pop("allowed_remote_exmods")
|
||||||
|
super(CallRequest, self).__init__(*args, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
|
class CastRequest(RpcRequest):
|
||||||
|
|
||||||
|
msg_type = zmq_names.CAST_TYPE
|
||||||
|
|
||||||
|
|
||||||
|
class FanoutRequest(RpcRequest):
|
||||||
|
|
||||||
|
msg_type = zmq_names.CAST_FANOUT_TYPE
|
||||||
|
|
||||||
|
|
||||||
|
class NotificationRequest(Request):
|
||||||
|
|
||||||
|
msg_type = zmq_names.NOTIFY_TYPE
|
||||||
|
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
self.version = kwargs.pop("version")
|
||||||
|
super(NotificationRequest, self).__init__(*args, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
|
class NotificationFanoutRequest(NotificationRequest):
|
||||||
|
|
||||||
|
msg_type = zmq_names.NOTIFY_FANOUT_TYPE
|
@ -1 +0,0 @@
|
|||||||
__author__ = 'ozamiatin'
|
|
@ -1,76 +0,0 @@
|
|||||||
# Copyright 2015 Mirantis, Inc.
|
|
||||||
#
|
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
||||||
# not use this file except in compliance with the License. You may obtain
|
|
||||||
# a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
||||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
||||||
# License for the specific language governing permissions and limitations
|
|
||||||
# under the License.
|
|
||||||
|
|
||||||
import logging
|
|
||||||
|
|
||||||
import oslo_messaging
|
|
||||||
from oslo_messaging._drivers import common as rpc_common
|
|
||||||
from oslo_messaging._drivers.zmq_driver.rpc.client.zmq_request import Request
|
|
||||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
|
||||||
from oslo_messaging._drivers.zmq_driver import zmq_serializer
|
|
||||||
from oslo_messaging._drivers.zmq_driver import zmq_target
|
|
||||||
from oslo_messaging._i18n import _LE, _LI
|
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
zmq = zmq_async.import_zmq()
|
|
||||||
|
|
||||||
|
|
||||||
class CallRequest(Request):
|
|
||||||
|
|
||||||
msg_type = zmq_serializer.CALL_TYPE
|
|
||||||
|
|
||||||
def __init__(self, conf, target, context, message, timeout=None,
|
|
||||||
retry=None, allowed_remote_exmods=None, matchmaker=None):
|
|
||||||
self.allowed_remote_exmods = allowed_remote_exmods or []
|
|
||||||
self.matchmaker = matchmaker
|
|
||||||
self.reply_poller = zmq_async.get_reply_poller()
|
|
||||||
|
|
||||||
try:
|
|
||||||
self.zmq_context = zmq.Context()
|
|
||||||
socket = self.zmq_context.socket(zmq.REQ)
|
|
||||||
super(CallRequest, self).__init__(conf, target, context,
|
|
||||||
message, socket,
|
|
||||||
timeout, retry)
|
|
||||||
self.host = self.matchmaker.get_single_host(self.target)
|
|
||||||
self.connect_address = zmq_target.get_tcp_direct_address(
|
|
||||||
self.host)
|
|
||||||
LOG.info(_LI("Connecting REQ to %s") % self.connect_address)
|
|
||||||
self.socket.connect(self.connect_address)
|
|
||||||
self.reply_poller.register(
|
|
||||||
self.socket, recv_method=lambda socket: socket.recv_json())
|
|
||||||
|
|
||||||
except zmq.ZMQError as e:
|
|
||||||
errmsg = _LE("Error connecting to socket: %s") % str(e)
|
|
||||||
LOG.error(errmsg)
|
|
||||||
raise rpc_common.RPCException(errmsg)
|
|
||||||
|
|
||||||
def close(self):
|
|
||||||
self.reply_poller.close()
|
|
||||||
self.socket.setsockopt(zmq.LINGER, 0)
|
|
||||||
self.socket.close()
|
|
||||||
|
|
||||||
def receive_reply(self):
|
|
||||||
# NOTE(ozamiatin): Check for retry here (no retries now)
|
|
||||||
reply, socket = self.reply_poller.poll(timeout=self.timeout)
|
|
||||||
if reply is None:
|
|
||||||
raise oslo_messaging.MessagingTimeout(
|
|
||||||
"Timeout %s seconds was reached" % self.timeout)
|
|
||||||
|
|
||||||
if reply[zmq_serializer.FIELD_FAILURE]:
|
|
||||||
raise rpc_common.deserialize_remote_exception(
|
|
||||||
reply[zmq_serializer.FIELD_FAILURE],
|
|
||||||
self.allowed_remote_exmods)
|
|
||||||
else:
|
|
||||||
return reply[zmq_serializer.FIELD_REPLY]
|
|
@ -1,100 +0,0 @@
|
|||||||
# Copyright 2015 Mirantis, Inc.
|
|
||||||
#
|
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
||||||
# not use this file except in compliance with the License. You may obtain
|
|
||||||
# a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
||||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
||||||
# License for the specific language governing permissions and limitations
|
|
||||||
# under the License.
|
|
||||||
|
|
||||||
import logging
|
|
||||||
|
|
||||||
from oslo_messaging._drivers import common as rpc_common
|
|
||||||
from oslo_messaging._drivers.zmq_driver.rpc.client import zmq_cast_publisher
|
|
||||||
from oslo_messaging._drivers.zmq_driver.rpc.client.zmq_request import Request
|
|
||||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
|
||||||
from oslo_messaging._drivers.zmq_driver import zmq_serializer
|
|
||||||
from oslo_messaging._drivers.zmq_driver import zmq_target
|
|
||||||
from oslo_messaging._i18n import _LE, _LI
|
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
zmq = zmq_async.import_zmq()
|
|
||||||
|
|
||||||
|
|
||||||
class CastRequest(Request):
|
|
||||||
|
|
||||||
msg_type = zmq_serializer.CAST_TYPE
|
|
||||||
|
|
||||||
def __call__(self, *args, **kwargs):
|
|
||||||
self.send_request()
|
|
||||||
|
|
||||||
def send_request(self):
|
|
||||||
self.socket.send(b'', zmq.SNDMORE)
|
|
||||||
super(CastRequest, self).send_request()
|
|
||||||
|
|
||||||
def receive_reply(self):
|
|
||||||
# Ignore reply for CAST
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class FanoutRequest(CastRequest):
|
|
||||||
|
|
||||||
msg_type = zmq_serializer.FANOUT_TYPE
|
|
||||||
|
|
||||||
def __init__(self, *args, **kwargs):
|
|
||||||
self.hosts_count = kwargs.pop("hosts_count")
|
|
||||||
super(FanoutRequest, self).__init__(*args, **kwargs)
|
|
||||||
|
|
||||||
def send_request(self):
|
|
||||||
for _ in range(self.hosts_count):
|
|
||||||
super(FanoutRequest, self).send_request()
|
|
||||||
|
|
||||||
|
|
||||||
class DealerCastPublisher(zmq_cast_publisher.CastPublisherBase):
|
|
||||||
|
|
||||||
def __init__(self, conf, matchmaker):
|
|
||||||
super(DealerCastPublisher, self).__init__(conf)
|
|
||||||
self.matchmaker = matchmaker
|
|
||||||
|
|
||||||
def cast(self, target, context,
|
|
||||||
message, timeout=None, retry=None):
|
|
||||||
if str(target) in self.outbound_sockets:
|
|
||||||
dealer_socket, hosts = self.outbound_sockets[str(target)]
|
|
||||||
else:
|
|
||||||
dealer_socket = zmq.Context().socket(zmq.DEALER)
|
|
||||||
hosts = self.matchmaker.get_hosts(target)
|
|
||||||
for host in hosts:
|
|
||||||
self._connect_to_host(dealer_socket, host)
|
|
||||||
self.outbound_sockets[str(target)] = (dealer_socket, hosts)
|
|
||||||
|
|
||||||
if target.fanout:
|
|
||||||
request = FanoutRequest(self.conf, target, context, message,
|
|
||||||
dealer_socket, timeout, retry,
|
|
||||||
hosts_count=len(hosts))
|
|
||||||
else:
|
|
||||||
request = CastRequest(self.conf, target, context, message,
|
|
||||||
dealer_socket, timeout, retry)
|
|
||||||
|
|
||||||
request.send_request()
|
|
||||||
|
|
||||||
def _connect_to_host(self, socket, host):
|
|
||||||
address = zmq_target.get_tcp_direct_address(host)
|
|
||||||
try:
|
|
||||||
LOG.info(_LI("Connecting DEALER to %s") % address)
|
|
||||||
socket.connect(address)
|
|
||||||
except zmq.ZMQError as e:
|
|
||||||
errmsg = _LE("Failed connecting DEALER to %(address)s: %(e)s")\
|
|
||||||
% (address, e)
|
|
||||||
LOG.error(errmsg)
|
|
||||||
raise rpc_common.RPCException(errmsg)
|
|
||||||
|
|
||||||
def cleanup(self):
|
|
||||||
for socket, hosts in self.outbound_sockets.values():
|
|
||||||
socket.setsockopt(zmq.LINGER, 0)
|
|
||||||
socket.close()
|
|
@ -1,31 +0,0 @@
|
|||||||
# Copyright 2015 Mirantis, Inc.
|
|
||||||
#
|
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
||||||
# not use this file except in compliance with the License. You may obtain
|
|
||||||
# a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
||||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
||||||
# License for the specific language governing permissions and limitations
|
|
||||||
# under the License.
|
|
||||||
|
|
||||||
import abc
|
|
||||||
|
|
||||||
import six
|
|
||||||
|
|
||||||
|
|
||||||
@six.add_metaclass(abc.ABCMeta)
|
|
||||||
class CastPublisherBase(object):
|
|
||||||
|
|
||||||
def __init__(self, conf):
|
|
||||||
self.conf = conf
|
|
||||||
self.outbound_sockets = {}
|
|
||||||
super(CastPublisherBase, self).__init__()
|
|
||||||
|
|
||||||
@abc.abstractmethod
|
|
||||||
def cast(self, target, context,
|
|
||||||
message, timeout=None, retry=None):
|
|
||||||
"Send CAST to target"
|
|
@ -1,41 +0,0 @@
|
|||||||
# Copyright 2015 Mirantis, Inc.
|
|
||||||
#
|
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
||||||
# not use this file except in compliance with the License. You may obtain
|
|
||||||
# a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
||||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
||||||
# License for the specific language governing permissions and limitations
|
|
||||||
# under the License.
|
|
||||||
|
|
||||||
import contextlib
|
|
||||||
|
|
||||||
from oslo_messaging._drivers.zmq_driver.rpc.client import zmq_call_request
|
|
||||||
from oslo_messaging._drivers.zmq_driver.rpc.client import zmq_cast_dealer
|
|
||||||
|
|
||||||
|
|
||||||
class ZmqClient(object):
|
|
||||||
|
|
||||||
def __init__(self, conf, matchmaker=None, allowed_remote_exmods=None):
|
|
||||||
self.conf = conf
|
|
||||||
self.matchmaker = matchmaker
|
|
||||||
self.allowed_remote_exmods = allowed_remote_exmods or []
|
|
||||||
self.cast_publisher = zmq_cast_dealer.DealerCastPublisher(conf,
|
|
||||||
matchmaker)
|
|
||||||
|
|
||||||
def call(self, target, context, message, timeout=None, retry=None):
|
|
||||||
with contextlib.closing(zmq_call_request.CallRequest(
|
|
||||||
self.conf, target, context, message, timeout, retry,
|
|
||||||
self.allowed_remote_exmods,
|
|
||||||
self.matchmaker)) as request:
|
|
||||||
return request()
|
|
||||||
|
|
||||||
def cast(self, target, context, message, timeout=None, retry=None):
|
|
||||||
self.cast_publisher.cast(target, context, message, timeout, retry)
|
|
||||||
|
|
||||||
def cleanup(self):
|
|
||||||
self.cast_publisher.cleanup()
|
|
@ -1,66 +0,0 @@
|
|||||||
# Copyright 2015 Mirantis, Inc.
|
|
||||||
#
|
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
||||||
# not use this file except in compliance with the License. You may obtain
|
|
||||||
# a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
||||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
||||||
# License for the specific language governing permissions and limitations
|
|
||||||
# under the License.
|
|
||||||
|
|
||||||
import abc
|
|
||||||
import logging
|
|
||||||
|
|
||||||
import six
|
|
||||||
|
|
||||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
|
||||||
from oslo_messaging._drivers.zmq_driver import zmq_serializer
|
|
||||||
from oslo_messaging._i18n import _LE
|
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
zmq = zmq_async.import_zmq()
|
|
||||||
|
|
||||||
|
|
||||||
@six.add_metaclass(abc.ABCMeta)
|
|
||||||
class Request(object):
|
|
||||||
|
|
||||||
def __init__(self, conf, target, context, message,
|
|
||||||
socket, timeout=None, retry=None):
|
|
||||||
|
|
||||||
if self.msg_type not in zmq_serializer.MESSAGE_TYPES:
|
|
||||||
raise RuntimeError("Unknown msg type!")
|
|
||||||
|
|
||||||
if message['method'] is None:
|
|
||||||
errmsg = _LE("No method specified for RPC call")
|
|
||||||
LOG.error(errmsg)
|
|
||||||
raise KeyError(errmsg)
|
|
||||||
|
|
||||||
self.target = target
|
|
||||||
self.context = context
|
|
||||||
self.message = message
|
|
||||||
self.timeout = timeout or conf.rpc_response_timeout
|
|
||||||
self.retry = retry
|
|
||||||
self.reply = None
|
|
||||||
self.socket = socket
|
|
||||||
|
|
||||||
@abc.abstractproperty
|
|
||||||
def msg_type(self):
|
|
||||||
"""ZMQ message type"""
|
|
||||||
|
|
||||||
def send_request(self):
|
|
||||||
self.socket.send_string(self.msg_type, zmq.SNDMORE)
|
|
||||||
self.socket.send_json(self.context, zmq.SNDMORE)
|
|
||||||
self.socket.send_json(self.message)
|
|
||||||
|
|
||||||
def __call__(self):
|
|
||||||
self.send_request()
|
|
||||||
return self.receive_reply()
|
|
||||||
|
|
||||||
@abc.abstractmethod
|
|
||||||
def receive_reply(self):
|
|
||||||
"Receive reply from server side"
|
|
@ -14,60 +14,50 @@
|
|||||||
|
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
from oslo_messaging._drivers import base
|
|
||||||
from oslo_messaging._drivers import common as rpc_common
|
from oslo_messaging._drivers import common as rpc_common
|
||||||
from oslo_messaging._drivers.zmq_driver.rpc.server import zmq_incoming_message
|
from oslo_messaging._drivers.zmq_driver.server import zmq_incoming_message
|
||||||
|
from oslo_messaging._drivers.zmq_driver import zmq_address
|
||||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||||
from oslo_messaging._drivers.zmq_driver import zmq_serializer
|
from oslo_messaging._drivers.zmq_driver import zmq_names
|
||||||
from oslo_messaging._drivers.zmq_driver import zmq_target
|
from oslo_messaging._i18n import _LE, _LI
|
||||||
from oslo_messaging._i18n import _LE
|
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
zmq = zmq_async.import_zmq()
|
zmq = zmq_async.import_zmq()
|
||||||
|
|
||||||
|
|
||||||
class ZmqServer(base.Listener):
|
class RouterConsumer(object):
|
||||||
|
|
||||||
|
def __init__(self, conf, poller, server):
|
||||||
|
|
||||||
|
self.poller = poller
|
||||||
|
self.server = server
|
||||||
|
|
||||||
def __init__(self, conf, matchmaker=None):
|
|
||||||
self.conf = conf
|
|
||||||
try:
|
try:
|
||||||
self.context = zmq.Context()
|
self.context = zmq.Context()
|
||||||
self.socket = self.context.socket(zmq.ROUTER)
|
self.socket = self.context.socket(zmq.ROUTER)
|
||||||
self.address = zmq_target.get_tcp_random_address(conf)
|
self.address = zmq_address.get_tcp_random_address(conf)
|
||||||
self.port = self.socket.bind_to_random_port(self.address)
|
self.port = self.socket.bind_to_random_port(self.address)
|
||||||
LOG.info("Run server on %s:%d" % (self.address, self.port))
|
LOG.info(_LI("Run ROUTER consumer on %(addr)s:%(port)d"),
|
||||||
|
{"addr": self.address,
|
||||||
|
"port": self.port})
|
||||||
except zmq.ZMQError as e:
|
except zmq.ZMQError as e:
|
||||||
errmsg = _LE("Failed binding to port %(port)d: %(e)s")\
|
errmsg = _LE("Failed binding to port %(port)d: %(e)s")\
|
||||||
% (self.port, e)
|
% (self.port, e)
|
||||||
LOG.error(errmsg)
|
LOG.error(errmsg)
|
||||||
raise rpc_common.RPCException(errmsg)
|
raise rpc_common.RPCException(errmsg)
|
||||||
|
|
||||||
self.poller = zmq_async.get_poller()
|
def listen(self, target):
|
||||||
self.matchmaker = matchmaker
|
LOG.info(_LI("Listen to target %s") % str(target))
|
||||||
|
|
||||||
def poll(self, timeout=None):
|
|
||||||
self.poller.register(self.socket, self._receive_message)
|
self.poller.register(self.socket, self._receive_message)
|
||||||
incoming = self.poller.poll(timeout or self.conf.rpc_poll_timeout)
|
|
||||||
return incoming[0]
|
|
||||||
|
|
||||||
def stop(self):
|
|
||||||
LOG.info("Stop server tcp://%s:%d" % (self.address, self.port))
|
|
||||||
|
|
||||||
def cleanup(self):
|
def cleanup(self):
|
||||||
self.poller.close()
|
|
||||||
if not self.socket.closed:
|
if not self.socket.closed:
|
||||||
self.socket.setsockopt(zmq.LINGER, 0)
|
self.socket.setsockopt(zmq.LINGER, 0)
|
||||||
self.socket.close()
|
self.socket.close()
|
||||||
|
|
||||||
def listen(self, target):
|
|
||||||
LOG.info("Listen to Target %s on tcp://%s:%d" %
|
|
||||||
(target, self.address, self.port))
|
|
||||||
host = zmq_target.combine_address(self.conf.rpc_zmq_host, self.port)
|
|
||||||
self.matchmaker.register(target=target,
|
|
||||||
hostname=host)
|
|
||||||
|
|
||||||
def _receive_message(self, socket):
|
def _receive_message(self, socket):
|
||||||
|
|
||||||
try:
|
try:
|
||||||
reply_id = socket.recv()
|
reply_id = socket.recv()
|
||||||
empty = socket.recv()
|
empty = socket.recv()
|
||||||
@ -76,15 +66,20 @@ class ZmqServer(base.Listener):
|
|||||||
assert msg_type is not None, 'Bad format: msg type expected'
|
assert msg_type is not None, 'Bad format: msg type expected'
|
||||||
context = socket.recv_json()
|
context = socket.recv_json()
|
||||||
message = socket.recv_json()
|
message = socket.recv_json()
|
||||||
LOG.debug("Received CALL message %s" % str(message))
|
LOG.debug("Received %s message %s" % (msg_type, str(message)))
|
||||||
|
|
||||||
direct_type = (zmq_serializer.CALL_TYPE, zmq_serializer.CAST_TYPE)
|
if msg_type == zmq_names.CALL_TYPE:
|
||||||
if msg_type in direct_type:
|
|
||||||
return zmq_incoming_message.ZmqIncomingRequest(
|
return zmq_incoming_message.ZmqIncomingRequest(
|
||||||
self, context, message, socket, reply_id, self.poller)
|
self.server, context, message, socket, reply_id,
|
||||||
elif msg_type == zmq_serializer.FANOUT_TYPE:
|
self.poller)
|
||||||
return zmq_incoming_message.ZmqFanoutMessage(
|
elif msg_type in zmq_names.CAST_TYPES:
|
||||||
self, context, message, socket, self.poller)
|
return zmq_incoming_message.ZmqCastMessage(
|
||||||
|
self.server, context, message, socket, self.poller)
|
||||||
|
elif msg_type in zmq_names.NOTIFY_TYPES:
|
||||||
|
return zmq_incoming_message.ZmqNotificationMessage(
|
||||||
|
self.server, context, message, socket, self.poller)
|
||||||
|
else:
|
||||||
|
LOG.error(_LE("Unknown message type: %s") % msg_type)
|
||||||
|
|
||||||
except zmq.ZMQError as e:
|
except zmq.ZMQError as e:
|
||||||
LOG.error(_LE("Receiving message failed: %s") % str(e))
|
LOG.error(_LE("Receiving message failed: %s") % str(e))
|
@ -18,7 +18,7 @@ import logging
|
|||||||
from oslo_messaging._drivers import base
|
from oslo_messaging._drivers import base
|
||||||
from oslo_messaging._drivers import common as rpc_common
|
from oslo_messaging._drivers import common as rpc_common
|
||||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||||
from oslo_messaging._drivers.zmq_driver import zmq_serializer
|
from oslo_messaging._drivers.zmq_driver import zmq_names
|
||||||
|
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
@ -39,9 +39,9 @@ class ZmqIncomingRequest(base.IncomingMessage):
|
|||||||
if failure is not None:
|
if failure is not None:
|
||||||
failure = rpc_common.serialize_remote_exception(failure,
|
failure = rpc_common.serialize_remote_exception(failure,
|
||||||
log_failure)
|
log_failure)
|
||||||
message_reply = {zmq_serializer.FIELD_REPLY: reply,
|
message_reply = {zmq_names.FIELD_REPLY: reply,
|
||||||
zmq_serializer.FIELD_FAILURE: failure,
|
zmq_names.FIELD_FAILURE: failure,
|
||||||
zmq_serializer.FIELD_LOG_FAILURE: log_failure}
|
zmq_names.FIELD_LOG_FAILURE: log_failure}
|
||||||
LOG.debug("Replying %s REP", (str(message_reply)))
|
LOG.debug("Replying %s REP", (str(message_reply)))
|
||||||
self.received = True
|
self.received = True
|
||||||
self.reply_socket.send(self.reply_id, zmq.SNDMORE)
|
self.reply_socket.send(self.reply_id, zmq.SNDMORE)
|
||||||
@ -56,10 +56,10 @@ class ZmqIncomingRequest(base.IncomingMessage):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
class ZmqFanoutMessage(base.IncomingMessage):
|
class ZmqCastMessage(base.IncomingMessage):
|
||||||
|
|
||||||
def __init__(self, listener, context, message, socket, poller):
|
def __init__(self, listener, context, message, socket, poller):
|
||||||
super(ZmqFanoutMessage, self).__init__(listener, context, message)
|
super(ZmqCastMessage, self).__init__(listener, context, message)
|
||||||
poller.resume_polling(socket)
|
poller.resume_polling(socket)
|
||||||
|
|
||||||
def reply(self, reply=None, failure=None, log_failure=True):
|
def reply(self, reply=None, failure=None, log_failure=True):
|
||||||
@ -70,3 +70,20 @@ class ZmqFanoutMessage(base.IncomingMessage):
|
|||||||
|
|
||||||
def requeue(self):
|
def requeue(self):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class ZmqNotificationMessage(base.IncomingMessage):
|
||||||
|
|
||||||
|
def __init__(self, listener, context, message, socket, poller):
|
||||||
|
super(ZmqNotificationMessage, self).__init__(listener, context,
|
||||||
|
message)
|
||||||
|
poller.resume_polling(socket)
|
||||||
|
|
||||||
|
def reply(self, reply=None, failure=None, log_failure=True):
|
||||||
|
"""Reply is not needed for notification messages"""
|
||||||
|
|
||||||
|
def acknowledge(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def requeue(self):
|
||||||
|
pass
|
80
oslo_messaging/_drivers/zmq_driver/server/zmq_server.py
Normal file
80
oslo_messaging/_drivers/zmq_driver/server/zmq_server.py
Normal file
@ -0,0 +1,80 @@
|
|||||||
|
# Copyright 2015 Mirantis, Inc.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
import copy
|
||||||
|
import logging
|
||||||
|
|
||||||
|
from oslo_messaging._drivers import base
|
||||||
|
from oslo_messaging._drivers.zmq_driver.server.consumers\
|
||||||
|
import zmq_router_consumer
|
||||||
|
from oslo_messaging._drivers.zmq_driver import zmq_address
|
||||||
|
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||||
|
|
||||||
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
zmq = zmq_async.import_zmq()
|
||||||
|
|
||||||
|
|
||||||
|
class ZmqServer(base.Listener):
|
||||||
|
|
||||||
|
def __init__(self, conf, matchmaker=None):
|
||||||
|
self.conf = conf
|
||||||
|
self.matchmaker = matchmaker
|
||||||
|
self.poller = zmq_async.get_poller()
|
||||||
|
self.rpc_consumer = zmq_router_consumer.RouterConsumer(
|
||||||
|
conf, self.poller, self)
|
||||||
|
self.notify_consumer = self.rpc_consumer
|
||||||
|
self.consumers = [self.rpc_consumer]
|
||||||
|
|
||||||
|
def poll(self, timeout=None):
|
||||||
|
message, socket = self.poller.poll(
|
||||||
|
timeout or self.conf.rpc_poll_timeout)
|
||||||
|
return message
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
consumer = self.rpc_consumer
|
||||||
|
LOG.info("Stop server %s:%d" % (consumer.address, consumer.port))
|
||||||
|
|
||||||
|
def cleanup(self):
|
||||||
|
self.poller.close()
|
||||||
|
for consumer in self.consumers:
|
||||||
|
consumer.cleanup()
|
||||||
|
|
||||||
|
def listen(self, target):
|
||||||
|
|
||||||
|
consumer = self.rpc_consumer
|
||||||
|
consumer.listen(target)
|
||||||
|
|
||||||
|
LOG.info("Listen to target %s on %s:%d" %
|
||||||
|
(target, consumer.address, consumer.port))
|
||||||
|
|
||||||
|
host = zmq_address.combine_address(self.conf.rpc_zmq_host,
|
||||||
|
consumer.port)
|
||||||
|
self.matchmaker.register(target=target,
|
||||||
|
hostname=host)
|
||||||
|
|
||||||
|
def listen_notification(self, targets_and_priorities):
|
||||||
|
|
||||||
|
consumer = self.notify_consumer
|
||||||
|
|
||||||
|
LOG.info("Listen for notifications on %s:%d"
|
||||||
|
% (consumer.address, consumer.port))
|
||||||
|
|
||||||
|
for target, priority in targets_and_priorities:
|
||||||
|
host = zmq_address.combine_address(self.conf.rpc_zmq_host,
|
||||||
|
consumer.port)
|
||||||
|
t = copy.deepcopy(target)
|
||||||
|
t.topic = target.topic + '.' + priority
|
||||||
|
self.matchmaker.register(target=t, hostname=host)
|
||||||
|
consumer.listen(t)
|
@ -12,9 +12,6 @@
|
|||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
MESSAGE_CALL_TYPE_POSITION = 1
|
|
||||||
MESSAGE_CALL_TARGET_POSITION = 2
|
|
||||||
MESSAGE_CALL_TOPIC_POSITION = 3
|
|
||||||
|
|
||||||
FIELD_FAILURE = 'failure'
|
FIELD_FAILURE = 'failure'
|
||||||
FIELD_REPLY = 'reply'
|
FIELD_REPLY = 'reply'
|
||||||
@ -22,7 +19,17 @@ FIELD_LOG_FAILURE = 'log_failure'
|
|||||||
|
|
||||||
CALL_TYPE = 'call'
|
CALL_TYPE = 'call'
|
||||||
CAST_TYPE = 'cast'
|
CAST_TYPE = 'cast'
|
||||||
FANOUT_TYPE = 'fanout'
|
CAST_FANOUT_TYPE = 'cast-f'
|
||||||
NOTIFY_TYPE = 'notify'
|
NOTIFY_TYPE = 'notify'
|
||||||
|
NOTIFY_FANOUT_TYPE = 'notify-f'
|
||||||
|
|
||||||
MESSAGE_TYPES = (CALL_TYPE, CAST_TYPE, FANOUT_TYPE, NOTIFY_TYPE)
|
MESSAGE_TYPES = (CALL_TYPE,
|
||||||
|
CAST_TYPE,
|
||||||
|
CAST_FANOUT_TYPE,
|
||||||
|
NOTIFY_TYPE,
|
||||||
|
NOTIFY_FANOUT_TYPE)
|
||||||
|
|
||||||
|
MULTISEND_TYPES = (CAST_FANOUT_TYPE, NOTIFY_FANOUT_TYPE)
|
||||||
|
DIRECT_TYPES = (CALL_TYPE, CAST_TYPE, NOTIFY_TYPE)
|
||||||
|
CAST_TYPES = (CAST_TYPE, CAST_FANOUT_TYPE)
|
||||||
|
NOTIFY_TYPES = (NOTIFY_TYPE, NOTIFY_FANOUT_TYPE)
|
@ -29,11 +29,10 @@ LOG = logging.getLogger(__name__)
|
|||||||
zmq = zmq_async.import_zmq()
|
zmq = zmq_async.import_zmq()
|
||||||
|
|
||||||
|
|
||||||
class TestRPCServerListener(object):
|
class TestServerListener(object):
|
||||||
|
|
||||||
def __init__(self, driver):
|
def __init__(self, driver):
|
||||||
self.driver = driver
|
self.driver = driver
|
||||||
self.target = None
|
|
||||||
self.listener = None
|
self.listener = None
|
||||||
self.executor = zmq_async.get_executor(self._run)
|
self.executor = zmq_async.get_executor(self._run)
|
||||||
self._stop = threading.Event()
|
self._stop = threading.Event()
|
||||||
@ -41,8 +40,12 @@ class TestRPCServerListener(object):
|
|||||||
self.message = None
|
self.message = None
|
||||||
|
|
||||||
def listen(self, target):
|
def listen(self, target):
|
||||||
self.target = target
|
self.listener = self.driver.listen(target)
|
||||||
self.listener = self.driver.listen(self.target)
|
self.executor.execute()
|
||||||
|
|
||||||
|
def listen_notifications(self, targets_and_priorities):
|
||||||
|
self.listener = self.driver.listen_for_notifications(
|
||||||
|
targets_and_priorities, {})
|
||||||
self.executor.execute()
|
self.executor.execute()
|
||||||
|
|
||||||
def _run(self):
|
def _run(self):
|
||||||
@ -80,7 +83,7 @@ class ZmqBaseTestCase(test_utils.BaseTestCase):
|
|||||||
transport = oslo_messaging.get_transport(self.conf)
|
transport = oslo_messaging.get_transport(self.conf)
|
||||||
self.driver = transport._driver
|
self.driver = transport._driver
|
||||||
|
|
||||||
self.listener = TestRPCServerListener(self.driver)
|
self.listener = TestServerListener(self.driver)
|
||||||
|
|
||||||
self.addCleanup(stopRpc(self.__dict__))
|
self.addCleanup(stopRpc(self.__dict__))
|
||||||
|
|
||||||
@ -174,6 +177,20 @@ class TestZmqBasics(ZmqBaseTestCase):
|
|||||||
wait_for_reply=True)
|
wait_for_reply=True)
|
||||||
self.assertTrue(result)
|
self.assertTrue(result)
|
||||||
|
|
||||||
|
def test_send_receive_notification(self):
|
||||||
|
"""Notify() test"""
|
||||||
|
|
||||||
|
target = oslo_messaging.Target(topic='t1',
|
||||||
|
server='notification@server')
|
||||||
|
self.listener.listen_notifications([(target, 'info')])
|
||||||
|
|
||||||
|
message = {'method': 'hello-world', 'tx_id': 1}
|
||||||
|
context = {}
|
||||||
|
target.topic = target.topic + '.info'
|
||||||
|
self.driver.send_notification(target, context, message, '3.0')
|
||||||
|
self.listener._received.wait()
|
||||||
|
self.assertTrue(self.listener._received.isSet())
|
||||||
|
|
||||||
|
|
||||||
class TestPoller(test_utils.BaseTestCase):
|
class TestPoller(test_utils.BaseTestCase):
|
||||||
|
|
||||||
|
@ -187,11 +187,6 @@ class NotifyTestCase(utils.SkipIfNoTransportURL):
|
|||||||
# NOTE(sileht): Each test must not use the same topics
|
# NOTE(sileht): Each test must not use the same topics
|
||||||
# to be run in parallel
|
# to be run in parallel
|
||||||
|
|
||||||
def setUp(self):
|
|
||||||
super(NotifyTestCase, self).setUp()
|
|
||||||
if self.url.startswith("zmq"):
|
|
||||||
self.skipTest("Skip NotifyTestCase for ZMQ driver")
|
|
||||||
|
|
||||||
def test_simple(self):
|
def test_simple(self):
|
||||||
listener = self.useFixture(
|
listener = self.useFixture(
|
||||||
utils.NotificationFixture(self.url, ['test_simple']))
|
utils.NotificationFixture(self.url, ['test_simple']))
|
||||||
|
@ -25,10 +25,10 @@ import logging
|
|||||||
import sys
|
import sys
|
||||||
import time
|
import time
|
||||||
|
|
||||||
from oslo.config import cfg
|
from oslo_config import cfg
|
||||||
from oslo import messaging
|
import oslo_messaging as messaging
|
||||||
from oslo.messaging import notify
|
from oslo_messaging import notify
|
||||||
from oslo.messaging import rpc
|
from oslo_messaging import rpc
|
||||||
|
|
||||||
LOG = logging.getLogger()
|
LOG = logging.getLogger()
|
||||||
|
|
||||||
|
2
tox.ini
2
tox.ini
@ -41,7 +41,7 @@ setenv = TRANSPORT_URL=amqp://stackqpid:secretqpid@127.0.0.1:65123//
|
|||||||
commands = {toxinidir}/setup-test-env-qpid.sh python setup.py testr --slowest --testr-args='oslo_messaging.tests.functional'
|
commands = {toxinidir}/setup-test-env-qpid.sh python setup.py testr --slowest --testr-args='oslo_messaging.tests.functional'
|
||||||
|
|
||||||
[testenv:py27-func-zeromq]
|
[testenv:py27-func-zeromq]
|
||||||
commands = {toxinidir}/setup-test-env-zmq.sh python setup.py testr --slowest --testr-args='oslo_messaging.tests.functional.test_functional'
|
commands = {toxinidir}/setup-test-env-zmq.sh python setup.py testr --slowest --testr-args='oslo_messaging.tests.functional'
|
||||||
|
|
||||||
[flake8]
|
[flake8]
|
||||||
show-source = True
|
show-source = True
|
||||||
|
Loading…
x
Reference in New Issue
Block a user