Merge "[zmq] Fix message sending when using proxy and not using PUB/SUB"
This commit is contained in:
commit
7a0f9334db
@ -55,7 +55,10 @@ class DealerPublisherProxy(object):
|
||||
zmq_address.target_to_subscribe_filter(request.target)
|
||||
self._do_send_request(request, routing_key)
|
||||
else:
|
||||
routing_keys = self.routing_table.get_all_hosts(request.target)
|
||||
routing_keys = \
|
||||
[self.routing_table.get_routable_host(request.target)] \
|
||||
if request.msg_type in zmq_names.DIRECT_TYPES else \
|
||||
self.routing_table.get_all_hosts(request.target)
|
||||
for routing_key in routing_keys:
|
||||
self._do_send_request(request, routing_key)
|
||||
|
||||
|
@ -171,7 +171,6 @@ class RedisMatchMaker(base.MatchMakerBase):
|
||||
return self._redis.smembers(key)
|
||||
|
||||
def register(self, target, hostname, listener_type, expire=-1):
|
||||
|
||||
if target.topic and target.server:
|
||||
key = zmq_address.target_to_key(target, listener_type)
|
||||
self._add_key_with_expire(key, hostname, expire)
|
||||
@ -191,11 +190,14 @@ class RedisMatchMaker(base.MatchMakerBase):
|
||||
|
||||
def get_hosts(self, target, listener_type):
|
||||
LOG.debug("[Redis] get_hosts for target %s", target)
|
||||
hosts = []
|
||||
key = zmq_address.target_to_key(target, listener_type)
|
||||
hosts.extend(self._get_hosts_by_key(key))
|
||||
|
||||
if (not hosts or target.fanout) and target.topic and target.server:
|
||||
hosts = []
|
||||
|
||||
if target.topic and target.server:
|
||||
key = zmq_address.target_to_key(target, listener_type)
|
||||
hosts.extend(self._get_hosts_by_key(key))
|
||||
|
||||
if not hosts and target.topic:
|
||||
key = zmq_address.prefix_str(target.topic, listener_type)
|
||||
hosts.extend(self._get_hosts_by_key(key))
|
||||
|
||||
|
@ -290,6 +290,12 @@ class SkipIfNoTransportURL(test_utils.BaseTestCase):
|
||||
zmq_redis_port = os.environ.get('ZMQ_REDIS_PORT')
|
||||
if zmq_redis_port:
|
||||
self.config(port=zmq_redis_port, group="matchmaker_redis")
|
||||
zmq_use_pub_sub = os.environ.get('ZMQ_USE_PUB_SUB')
|
||||
if zmq_use_pub_sub:
|
||||
self.config(use_pub_sub=zmq_use_pub_sub)
|
||||
zmq_use_router_proxy = os.environ.get('ZMQ_USE_ROUTER_PROXY')
|
||||
if zmq_use_router_proxy:
|
||||
self.config(use_router_proxy=zmq_use_router_proxy)
|
||||
|
||||
|
||||
class NotificationFixture(fixtures.Fixture):
|
||||
|
@ -10,20 +10,22 @@ export TRANSPORT_URL=zmq://
|
||||
export ZMQ_MATCHMAKER=redis
|
||||
export ZMQ_REDIS_PORT=65123
|
||||
export ZMQ_IPC_DIR=${DATADIR}
|
||||
export ZMQ_USE_PUB_SUB=false
|
||||
export ZMQ_USE_ROUTER_PROXY=true
|
||||
|
||||
cat > ${DATADIR}/zmq.conf <<EOF
|
||||
[DEFAULT]
|
||||
transport_url=${TRANSPORT_URL}
|
||||
rpc_zmq_matchmaker=${ZMQ_MATCHMAKER}
|
||||
rpc_zmq_ipc_dir=${ZMQ_IPC_DIR}
|
||||
use_pub_sub=true
|
||||
use_router_proxy=true
|
||||
use_pub_sub=${ZMQ_USE_PUB_SUB}
|
||||
use_router_proxy=${ZMQ_USE_ROUTER_PROXY}
|
||||
[matchmaker_redis]
|
||||
port=${ZMQ_REDIS_PORT}
|
||||
EOF
|
||||
|
||||
redis-server --port $ZMQ_REDIS_PORT &
|
||||
|
||||
oslo-messaging-zmq-proxy --config-file ${DATADIR}/zmq.conf > ${DATADIR}/zmq-publisher.log 2>&1 &
|
||||
oslo-messaging-zmq-proxy --debug True --config-file ${DATADIR}/zmq.conf > ${DATADIR}/zmq-proxy.log 2>&1 &
|
||||
|
||||
$*
|
||||
|
Loading…
x
Reference in New Issue
Block a user