Merge "Fix duplicated notification"
This commit is contained in:
commit
48bc1aa0a5
@ -19,6 +19,8 @@ import futurist
|
|||||||
from oslo_log import log as logging
|
from oslo_log import log as logging
|
||||||
from six.moves import urllib_parse
|
from six.moves import urllib_parse
|
||||||
|
|
||||||
|
from zaqar.storage import pooling
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
@ -34,19 +36,19 @@ class NotifierDriver(object):
|
|||||||
|
|
||||||
def post(self, queue_name, messages, client_uuid, project=None):
|
def post(self, queue_name, messages, client_uuid, project=None):
|
||||||
"""Send messages to the subscribers."""
|
"""Send messages to the subscribers."""
|
||||||
if self.subscription_controller:
|
if (self.subscription_controller and
|
||||||
|
not isinstance(self.subscription_controller,
|
||||||
|
pooling.SubscriptionController)):
|
||||||
subscribers = self.subscription_controller.list(queue_name,
|
subscribers = self.subscription_controller.list(queue_name,
|
||||||
project)
|
project)
|
||||||
|
|
||||||
for sub in next(subscribers):
|
for sub in next(subscribers):
|
||||||
s_type = urllib_parse.urlparse(sub['subscriber']).scheme
|
s_type = urllib_parse.urlparse(sub['subscriber']).scheme
|
||||||
data_driver = self.subscription_controller.driver
|
data_driver = self.subscription_controller.driver
|
||||||
conf = (getattr(data_driver, 'conf', None) or
|
|
||||||
getattr(data_driver, '_conf'))
|
|
||||||
mgr = driver.DriverManager('zaqar.notification.tasks',
|
mgr = driver.DriverManager('zaqar.notification.tasks',
|
||||||
s_type,
|
s_type,
|
||||||
invoke_on_load=True)
|
invoke_on_load=True)
|
||||||
self.executor.submit(mgr.driver.execute, sub, messages,
|
self.executor.submit(mgr.driver.execute, sub, messages,
|
||||||
conf=conf)
|
conf=data_driver.conf)
|
||||||
else:
|
else:
|
||||||
LOG.error('Failed to get subscription controller.')
|
LOG.error('Failed to get subscription controller.')
|
||||||
|
Loading…
Reference in New Issue
Block a user