From eaf433bf4978a1eb8585f6979918716e31030b8e Mon Sep 17 00:00:00 2001 From: Gevorg Davoian Date: Wed, 14 Sep 2016 10:59:07 +0300 Subject: [PATCH] [zmq] Remove unnecessary subscriptions from SubConsumer Change-Id: I86c0e3408382ee85b44f440e72fab7e468f7f7e1 --- .../server/consumers/zmq_sub_consumer.py | 42 ++++++++++--------- .../_drivers/zmq_driver/zmq_address.py | 9 +--- .../_drivers/zmq_driver/zmq_socket.py | 19 +++++---- 3 files changed, 34 insertions(+), 36 deletions(-) diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_sub_consumer.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_sub_consumer.py index 9e74bc498..89b630ed7 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_sub_consumer.py +++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_sub_consumer.py @@ -13,6 +13,7 @@ # under the License. import logging +import uuid import six @@ -23,7 +24,7 @@ from oslo_messaging._drivers.zmq_driver import zmq_address from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_socket from oslo_messaging._drivers.zmq_driver import zmq_updater -from oslo_messaging._i18n import _LE +from oslo_messaging._i18n import _LE, _LI LOG = logging.getLogger(__name__) @@ -37,32 +38,34 @@ class SubConsumer(zmq_consumer_base.ConsumerBase): self.matchmaker = server.matchmaker self.target = server.target self.socket = zmq_socket.ZmqSocket(self.conf, self.context, zmq.SUB, - immediate=False) + immediate=False, + identity=self._generate_identity()) self.sockets.append(self.socket) - self._subscribe_on_target(self.target) + self.host = self.socket.handle.identity + self._subscribe_to_topic() self.connection_updater = SubscriberConnectionUpdater( conf, self.matchmaker, self.socket) self.poller.register(self.socket, self.receive_message) + LOG.info(_LI("[%s] Run SUB consumer"), self.host) - def _subscribe_on_target(self, target): - topic_filter = zmq_address.target_to_subscribe_filter(target) - if target.topic: - self.socket.setsockopt(zmq.SUBSCRIBE, six.b(target.topic)) - if target.server: - self.socket.setsockopt(zmq.SUBSCRIBE, six.b(target.server)) - if target.topic and target.server: - self.socket.setsockopt(zmq.SUBSCRIBE, topic_filter) + def _generate_identity(self): + return six.b(self.conf.oslo_messaging_zmq.rpc_zmq_host + '/') + \ + zmq_address.target_to_subscribe_filter(self.target) + \ + six.b('/' + str(uuid.uuid4())) + + def _subscribe_to_topic(self): + topic_filter = zmq_address.target_to_subscribe_filter(self.target) + self.socket.setsockopt(zmq.SUBSCRIBE, topic_filter) LOG.debug("[%(host)s] Subscribing to topic %(filter)s", - {"host": self.socket.handle.identity, - "filter": topic_filter}) + {"host": self.host, "filter": topic_filter}) - @staticmethod - def _receive_request(socket): + def _receive_request(self, socket): topic_filter = socket.recv() message_id = socket.recv() context, message = socket.recv_loaded() - LOG.debug("Received %(topic_filter)s topic message %(id)s", - {'id': message_id, 'topic_filter': topic_filter}) + LOG.debug("[%(host)s] Received on topic %(filter)s message %(msg_id)s", + {'host': self.host, 'filter': topic_filter, + 'msg_id': message_id}) return context, message def receive_message(self, socket): @@ -75,6 +78,7 @@ class SubConsumer(zmq_consumer_base.ConsumerBase): LOG.error(_LE("Receiving message failed: %s"), str(e)) def cleanup(self): + LOG.info(_LI("[%s] Destroy SUB consumer"), self.host) self.connection_updater.cleanup() super(SubConsumer, self).cleanup() @@ -83,7 +87,7 @@ class SubscriberConnectionUpdater(zmq_updater.ConnectionUpdater): def _update_connection(self): publishers = self.matchmaker.get_publishers() - for host, sync in publishers: - self.socket.connect(zmq_address.get_tcp_direct_address(host)) + for publisher_address, router_address in publishers: + self.socket.connect_to_host(publisher_address) LOG.debug("[%s] SUB consumer connected to publishers %s", self.socket.handle.identity, publishers) diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_address.py b/oslo_messaging/_drivers/zmq_driver/zmq_address.py index 5f4d2a031..4d45dea54 100644 --- a/oslo_messaging/_drivers/zmq_driver/zmq_address.py +++ b/oslo_messaging/_drivers/zmq_driver/zmq_address.py @@ -50,11 +50,4 @@ def target_to_key(target, listener_type=None): def target_to_subscribe_filter(target): - if target.topic and target.server: - attributes = ['topic', 'server'] - key = "/".join(getattr(target, attr) for attr in attributes) - return six.b(key) - if target.topic: - return six.b(target.topic) - if target.server: - return six.b(target.server) + return six.b(target.topic) diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_socket.py b/oslo_messaging/_drivers/zmq_driver/zmq_socket.py index 56a9d35a7..9210f637c 100644 --- a/oslo_messaging/_drivers/zmq_driver/zmq_socket.py +++ b/oslo_messaging/_drivers/zmq_driver/zmq_socket.py @@ -177,18 +177,19 @@ class ZmqSocket(object): if address in self.connections: return stype = zmq_names.socket_type_str(self.socket_type) + sid = self.handle.identity try: - LOG.info(_LI("Connecting %(stype)s id %(id)s to %(address)s"), - {"stype": stype, - "id": self.handle.identity, - "address": address}) + LOG.info(_LI("Connecting %(stype)s socket %(sid)s to %(address)s"), + {"stype": stype, "sid": sid, "address": address}) self.connect(address) except zmq.ZMQError as e: - errmsg = _LE("Failed connecting %(stype)s to %(address)s: %(e)s") \ - % {"stype": stype, "address": address, "e": e} - LOG.error(_LE("Failed connecting %(stype)s to %(address)s: %(e)s"), - {"stype": stype, "address": address, "e": e}) - raise rpc_common.RPCException(errmsg) + LOG.error(_LE("Failed connecting %(stype)s-%(sid)s to " + "%(address)s: %(e)s"), + {"stype": stype, "sid": sid, "address": address, "e": e}) + raise rpc_common.RPCException( + "Failed connecting %(stype)s-%(sid)s to %(address)s: %(e)s" % + {"stype": stype, "sid": sid, "address": address, "e": e} + ) def connect_to_host(self, host): address = zmq_address.get_tcp_direct_address(