[zmq] Dummy add value aging mechanism
Dummy does not have value aging mechanisms and value filtering mechanisms. Like redis, we should use these two mechanisms Change-Id: I58d73e6b15a601e9e95274d47e822ae54ef5f5ba Closes-Bug: #1658940
This commit is contained in:
parent
35bf674364
commit
407544a040
41
oslo_messaging/_drivers/zmq_driver/matchmaker/zmq_matchmaker_base.py
Normal file → Executable file
41
oslo_messaging/_drivers/zmq_driver/matchmaker/zmq_matchmaker_base.py
Normal file → Executable file
@ -16,9 +16,11 @@ import collections
|
||||
import logging
|
||||
|
||||
import six
|
||||
import time
|
||||
|
||||
from oslo_messaging._drivers import common as rpc_common
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_address
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||
from oslo_messaging._i18n import _LE
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
@ -181,28 +183,48 @@ class MatchmakerDummy(MatchmakerBase):
|
||||
self._cache = collections.defaultdict(list)
|
||||
self._publishers = set()
|
||||
self._routers = set()
|
||||
self._address = {}
|
||||
self.executor = zmq_async.get_executor(method=self._loop)
|
||||
self.executor.execute()
|
||||
|
||||
def register_publisher(self, hostname, expire=-1):
|
||||
if hostname not in self._publishers:
|
||||
self._publishers.add(hostname)
|
||||
self._address[hostname] = expire
|
||||
|
||||
def unregister_publisher(self, hostname):
|
||||
if hostname in self._publishers:
|
||||
self._publishers.remove(hostname)
|
||||
if hostname in self._address:
|
||||
self._address.pop(hostname)
|
||||
|
||||
def get_publishers(self):
|
||||
return list(self._publishers)
|
||||
hosts = [host for host in self._publishers
|
||||
if self._address[host] > 0]
|
||||
return hosts
|
||||
|
||||
def register_router(self, hostname, expire=-1):
|
||||
if hostname not in self._routers:
|
||||
self._routers.add(hostname)
|
||||
self._address[hostname] = expire
|
||||
|
||||
def unregister_router(self, hostname):
|
||||
if hostname in self._routers:
|
||||
self._routers.remove(hostname)
|
||||
if hostname in self._address:
|
||||
self._address.pop(hostname)
|
||||
|
||||
def get_routers(self):
|
||||
return list(self._routers)
|
||||
hosts = [host for host in self._routers
|
||||
if self._address[host] > 0]
|
||||
return hosts
|
||||
|
||||
def _loop(self):
|
||||
for hostname in self._address:
|
||||
expire = self._address[hostname]
|
||||
if expire > 0:
|
||||
self._address[hostname] = expire - 1
|
||||
time.sleep(1)
|
||||
|
||||
def register(self, target, hostname, listener_type, expire=-1):
|
||||
if target.server:
|
||||
@ -214,6 +236,8 @@ class MatchmakerDummy(MatchmakerBase):
|
||||
if hostname not in self._cache[key]:
|
||||
self._cache[key].append(hostname)
|
||||
|
||||
self._address[hostname] = expire
|
||||
|
||||
def unregister(self, target, hostname, listener_type):
|
||||
if target.server:
|
||||
key = zmq_address.target_to_key(target, listener_type)
|
||||
@ -224,16 +248,21 @@ class MatchmakerDummy(MatchmakerBase):
|
||||
if hostname in self._cache[key]:
|
||||
self._cache[key].remove(hostname)
|
||||
|
||||
if hostname in self._address:
|
||||
self._address.pop(hostname)
|
||||
|
||||
def get_hosts(self, target, listener_type):
|
||||
hosts = []
|
||||
|
||||
if target.server:
|
||||
key = zmq_address.target_to_key(target, listener_type)
|
||||
hosts.extend(self._cache[key])
|
||||
hosts.extend([host for host in self._cache[key]
|
||||
if self._address[host] > 0])
|
||||
|
||||
if not hosts:
|
||||
key = zmq_address.prefix_str(target.topic, listener_type)
|
||||
hosts.extend(self._cache[key])
|
||||
hosts.extend([host for host in self._cache[key]
|
||||
if self._address[host] > 0])
|
||||
|
||||
LOG.debug("[Dummy] get_hosts for target %(target)s: %(hosts)s",
|
||||
{"target": target, "hosts": hosts})
|
||||
@ -246,8 +275,10 @@ class MatchmakerDummy(MatchmakerBase):
|
||||
return self.get_hosts(target, listener_type)
|
||||
|
||||
def get_hosts_fanout(self, target, listener_type):
|
||||
hosts = []
|
||||
key = zmq_address.target_to_key(target, listener_type)
|
||||
hosts = list(self._cache[key])
|
||||
hosts.extend([host for host in self._cache[key]
|
||||
if self._address[host] > 0])
|
||||
|
||||
LOG.debug("[Dummy] get_hosts_fanout for target %(target)s: %(hosts)s",
|
||||
{"target": target, "hosts": hosts})
|
||||
|
42
oslo_messaging/tests/drivers/zmq/matchmaker/test_impl_matchmaker.py
Normal file → Executable file
42
oslo_messaging/tests/drivers/zmq/matchmaker/test_impl_matchmaker.py
Normal file → Executable file
@ -65,21 +65,41 @@ class TestImplMatchmaker(test_utils.BaseTestCase):
|
||||
self.host2 = b"test_host2"
|
||||
|
||||
def test_register(self):
|
||||
self.test_matcher.register(self.target, self.host1, "test")
|
||||
self.test_matcher.register(
|
||||
self.target,
|
||||
self.host1,
|
||||
"test",
|
||||
expire=self.conf.oslo_messaging_zmq.zmq_target_expire)
|
||||
|
||||
self.assertEqual([self.host1],
|
||||
self.test_matcher.get_hosts(self.target, "test"))
|
||||
|
||||
def test_register_two_hosts(self):
|
||||
self.test_matcher.register(self.target, self.host1, "test")
|
||||
self.test_matcher.register(self.target, self.host2, "test")
|
||||
self.test_matcher.register(
|
||||
self.target,
|
||||
self.host1,
|
||||
"test",
|
||||
expire=self.conf.oslo_messaging_zmq.zmq_target_expire)
|
||||
self.test_matcher.register(
|
||||
self.target,
|
||||
self.host2,
|
||||
"test",
|
||||
expire=self.conf.oslo_messaging_zmq.zmq_target_expire)
|
||||
|
||||
self.assertItemsEqual(self.test_matcher.get_hosts(self.target, "test"),
|
||||
[self.host1, self.host2])
|
||||
|
||||
def test_register_unregister(self):
|
||||
self.test_matcher.register(self.target, self.host1, "test")
|
||||
self.test_matcher.register(self.target, self.host2, "test")
|
||||
self.test_matcher.register(
|
||||
self.target,
|
||||
self.host1,
|
||||
"test",
|
||||
expire=self.conf.oslo_messaging_zmq.zmq_target_expire)
|
||||
self.test_matcher.register(
|
||||
self.target,
|
||||
self.host2,
|
||||
"test",
|
||||
expire=self.conf.oslo_messaging_zmq.zmq_target_expire)
|
||||
|
||||
self.test_matcher.unregister(self.target, self.host2, "test")
|
||||
|
||||
@ -87,8 +107,16 @@ class TestImplMatchmaker(test_utils.BaseTestCase):
|
||||
[self.host1])
|
||||
|
||||
def test_register_two_same_hosts(self):
|
||||
self.test_matcher.register(self.target, self.host1, "test")
|
||||
self.test_matcher.register(self.target, self.host1, "test")
|
||||
self.test_matcher.register(
|
||||
self.target,
|
||||
self.host1,
|
||||
"test",
|
||||
expire=self.conf.oslo_messaging_zmq.zmq_target_expire)
|
||||
self.test_matcher.register(
|
||||
self.target,
|
||||
self.host1,
|
||||
"test",
|
||||
expire=self.conf.oslo_messaging_zmq.zmq_target_expire)
|
||||
|
||||
self.assertEqual([self.host1],
|
||||
self.test_matcher.get_hosts(self.target, "test"))
|
||||
|
4
oslo_messaging/tests/drivers/zmq/test_pub_sub.py
Normal file → Executable file
4
oslo_messaging/tests/drivers/zmq/test_pub_sub.py
Normal file → Executable file
@ -63,7 +63,9 @@ class TestPubSub(zmq_common.ZmqBaseTestCase):
|
||||
|
||||
self.publisher = zmq_publisher_proxy.PublisherProxy(
|
||||
self.conf, self.driver.matchmaker)
|
||||
self.driver.matchmaker.register_publisher((self.publisher.host, ''))
|
||||
self.driver.matchmaker.register_publisher(
|
||||
(self.publisher.host, ''),
|
||||
expire=self.conf.oslo_messaging_zmq.zmq_target_expire)
|
||||
|
||||
self.listeners = []
|
||||
for _ in range(self.LISTENERS_COUNT):
|
||||
|
Loading…
Reference in New Issue
Block a user