[zmq] Remove unnecessary subscriptions from SubConsumer
Change-Id: I86c0e3408382ee85b44f440e72fab7e468f7f7e1
This commit is contained in:
parent
09fa044777
commit
eaf433bf49
@ -13,6 +13,7 @@
|
|||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
|
import uuid
|
||||||
|
|
||||||
import six
|
import six
|
||||||
|
|
||||||
@ -23,7 +24,7 @@ from oslo_messaging._drivers.zmq_driver import zmq_address
|
|||||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||||
from oslo_messaging._drivers.zmq_driver import zmq_socket
|
from oslo_messaging._drivers.zmq_driver import zmq_socket
|
||||||
from oslo_messaging._drivers.zmq_driver import zmq_updater
|
from oslo_messaging._drivers.zmq_driver import zmq_updater
|
||||||
from oslo_messaging._i18n import _LE
|
from oslo_messaging._i18n import _LE, _LI
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
@ -37,32 +38,34 @@ class SubConsumer(zmq_consumer_base.ConsumerBase):
|
|||||||
self.matchmaker = server.matchmaker
|
self.matchmaker = server.matchmaker
|
||||||
self.target = server.target
|
self.target = server.target
|
||||||
self.socket = zmq_socket.ZmqSocket(self.conf, self.context, zmq.SUB,
|
self.socket = zmq_socket.ZmqSocket(self.conf, self.context, zmq.SUB,
|
||||||
immediate=False)
|
immediate=False,
|
||||||
|
identity=self._generate_identity())
|
||||||
self.sockets.append(self.socket)
|
self.sockets.append(self.socket)
|
||||||
self._subscribe_on_target(self.target)
|
self.host = self.socket.handle.identity
|
||||||
|
self._subscribe_to_topic()
|
||||||
self.connection_updater = SubscriberConnectionUpdater(
|
self.connection_updater = SubscriberConnectionUpdater(
|
||||||
conf, self.matchmaker, self.socket)
|
conf, self.matchmaker, self.socket)
|
||||||
self.poller.register(self.socket, self.receive_message)
|
self.poller.register(self.socket, self.receive_message)
|
||||||
|
LOG.info(_LI("[%s] Run SUB consumer"), self.host)
|
||||||
|
|
||||||
def _subscribe_on_target(self, target):
|
def _generate_identity(self):
|
||||||
topic_filter = zmq_address.target_to_subscribe_filter(target)
|
return six.b(self.conf.oslo_messaging_zmq.rpc_zmq_host + '/') + \
|
||||||
if target.topic:
|
zmq_address.target_to_subscribe_filter(self.target) + \
|
||||||
self.socket.setsockopt(zmq.SUBSCRIBE, six.b(target.topic))
|
six.b('/' + str(uuid.uuid4()))
|
||||||
if target.server:
|
|
||||||
self.socket.setsockopt(zmq.SUBSCRIBE, six.b(target.server))
|
def _subscribe_to_topic(self):
|
||||||
if target.topic and target.server:
|
topic_filter = zmq_address.target_to_subscribe_filter(self.target)
|
||||||
self.socket.setsockopt(zmq.SUBSCRIBE, topic_filter)
|
self.socket.setsockopt(zmq.SUBSCRIBE, topic_filter)
|
||||||
LOG.debug("[%(host)s] Subscribing to topic %(filter)s",
|
LOG.debug("[%(host)s] Subscribing to topic %(filter)s",
|
||||||
{"host": self.socket.handle.identity,
|
{"host": self.host, "filter": topic_filter})
|
||||||
"filter": topic_filter})
|
|
||||||
|
|
||||||
@staticmethod
|
def _receive_request(self, socket):
|
||||||
def _receive_request(socket):
|
|
||||||
topic_filter = socket.recv()
|
topic_filter = socket.recv()
|
||||||
message_id = socket.recv()
|
message_id = socket.recv()
|
||||||
context, message = socket.recv_loaded()
|
context, message = socket.recv_loaded()
|
||||||
LOG.debug("Received %(topic_filter)s topic message %(id)s",
|
LOG.debug("[%(host)s] Received on topic %(filter)s message %(msg_id)s",
|
||||||
{'id': message_id, 'topic_filter': topic_filter})
|
{'host': self.host, 'filter': topic_filter,
|
||||||
|
'msg_id': message_id})
|
||||||
return context, message
|
return context, message
|
||||||
|
|
||||||
def receive_message(self, socket):
|
def receive_message(self, socket):
|
||||||
@ -75,6 +78,7 @@ class SubConsumer(zmq_consumer_base.ConsumerBase):
|
|||||||
LOG.error(_LE("Receiving message failed: %s"), str(e))
|
LOG.error(_LE("Receiving message failed: %s"), str(e))
|
||||||
|
|
||||||
def cleanup(self):
|
def cleanup(self):
|
||||||
|
LOG.info(_LI("[%s] Destroy SUB consumer"), self.host)
|
||||||
self.connection_updater.cleanup()
|
self.connection_updater.cleanup()
|
||||||
super(SubConsumer, self).cleanup()
|
super(SubConsumer, self).cleanup()
|
||||||
|
|
||||||
@ -83,7 +87,7 @@ class SubscriberConnectionUpdater(zmq_updater.ConnectionUpdater):
|
|||||||
|
|
||||||
def _update_connection(self):
|
def _update_connection(self):
|
||||||
publishers = self.matchmaker.get_publishers()
|
publishers = self.matchmaker.get_publishers()
|
||||||
for host, sync in publishers:
|
for publisher_address, router_address in publishers:
|
||||||
self.socket.connect(zmq_address.get_tcp_direct_address(host))
|
self.socket.connect_to_host(publisher_address)
|
||||||
LOG.debug("[%s] SUB consumer connected to publishers %s",
|
LOG.debug("[%s] SUB consumer connected to publishers %s",
|
||||||
self.socket.handle.identity, publishers)
|
self.socket.handle.identity, publishers)
|
||||||
|
@ -50,11 +50,4 @@ def target_to_key(target, listener_type=None):
|
|||||||
|
|
||||||
|
|
||||||
def target_to_subscribe_filter(target):
|
def target_to_subscribe_filter(target):
|
||||||
if target.topic and target.server:
|
return six.b(target.topic)
|
||||||
attributes = ['topic', 'server']
|
|
||||||
key = "/".join(getattr(target, attr) for attr in attributes)
|
|
||||||
return six.b(key)
|
|
||||||
if target.topic:
|
|
||||||
return six.b(target.topic)
|
|
||||||
if target.server:
|
|
||||||
return six.b(target.server)
|
|
||||||
|
@ -177,18 +177,19 @@ class ZmqSocket(object):
|
|||||||
if address in self.connections:
|
if address in self.connections:
|
||||||
return
|
return
|
||||||
stype = zmq_names.socket_type_str(self.socket_type)
|
stype = zmq_names.socket_type_str(self.socket_type)
|
||||||
|
sid = self.handle.identity
|
||||||
try:
|
try:
|
||||||
LOG.info(_LI("Connecting %(stype)s id %(id)s to %(address)s"),
|
LOG.info(_LI("Connecting %(stype)s socket %(sid)s to %(address)s"),
|
||||||
{"stype": stype,
|
{"stype": stype, "sid": sid, "address": address})
|
||||||
"id": self.handle.identity,
|
|
||||||
"address": address})
|
|
||||||
self.connect(address)
|
self.connect(address)
|
||||||
except zmq.ZMQError as e:
|
except zmq.ZMQError as e:
|
||||||
errmsg = _LE("Failed connecting %(stype)s to %(address)s: %(e)s") \
|
LOG.error(_LE("Failed connecting %(stype)s-%(sid)s to "
|
||||||
% {"stype": stype, "address": address, "e": e}
|
"%(address)s: %(e)s"),
|
||||||
LOG.error(_LE("Failed connecting %(stype)s to %(address)s: %(e)s"),
|
{"stype": stype, "sid": sid, "address": address, "e": e})
|
||||||
{"stype": stype, "address": address, "e": e})
|
raise rpc_common.RPCException(
|
||||||
raise rpc_common.RPCException(errmsg)
|
"Failed connecting %(stype)s-%(sid)s to %(address)s: %(e)s" %
|
||||||
|
{"stype": stype, "sid": sid, "address": address, "e": e}
|
||||||
|
)
|
||||||
|
|
||||||
def connect_to_host(self, host):
|
def connect_to_host(self, host):
|
||||||
address = zmq_address.get_tcp_direct_address(
|
address = zmq_address.get_tcp_direct_address(
|
||||||
|
Loading…
Reference in New Issue
Block a user