Merge "Add pre-signed support for zaqar notifier"
This commit is contained in:
commit
caf9f2e1c2
@ -40,17 +40,21 @@ class ZaqarAlarmNotifier(notifier.AlarmNotifier):
|
|||||||
super(ZaqarAlarmNotifier, self).__init__(conf)
|
super(ZaqarAlarmNotifier, self).__init__(conf)
|
||||||
self.conf = conf
|
self.conf = conf
|
||||||
self._zclient = None
|
self._zclient = None
|
||||||
|
self._zendpoint = None
|
||||||
|
|
||||||
def _get_endpoint(self):
|
def _get_endpoint(self):
|
||||||
|
if self._zendpoint is None:
|
||||||
try:
|
try:
|
||||||
ks_client = keystone_client.get_client(self.conf)
|
ks_client = keystone_client.get_client(self.conf)
|
||||||
return ks_client.service_catalog.url_for(
|
endpoint_type = self.conf.service_credentials.os_endpoint_type
|
||||||
|
self._zendpoint = ks_client.service_catalog.url_for(
|
||||||
service_type=self.conf.service_types.zaqar,
|
service_type=self.conf.service_types.zaqar,
|
||||||
endpoint_type=self.conf.service_credentials.os_endpoint_type)
|
endpoint_type=endpoint_type)
|
||||||
except Exception:
|
except Exception:
|
||||||
LOG.error(_LE("Aodh was configured to use zaqar:// action,"
|
LOG.error(_LE("Aodh was configured to use zaqar:// action,"
|
||||||
" but Zaqar endpoint could not be found in Keystone"
|
" but Zaqar endpoint could not be found in"
|
||||||
" service catalog."))
|
" Keystone service catalog."))
|
||||||
|
return self._zendpoint
|
||||||
|
|
||||||
def get_zaqar_client(self):
|
def get_zaqar_client(self):
|
||||||
conf = self.conf.service_credentials
|
conf = self.conf.service_credentials
|
||||||
@ -69,7 +73,38 @@ class ZaqarAlarmNotifier(notifier.AlarmNotifier):
|
|||||||
try:
|
try:
|
||||||
from zaqarclient.queues import client as zaqar_client
|
from zaqarclient.queues import client as zaqar_client
|
||||||
return zaqar_client.Client(self._get_endpoint(),
|
return zaqar_client.Client(self._get_endpoint(),
|
||||||
version=1.1, conf=params)
|
version=2, conf=params)
|
||||||
|
except Exception:
|
||||||
|
LOG.error(_LE("Failed to connect to Zaqar service "),
|
||||||
|
exc_info=True)
|
||||||
|
|
||||||
|
def get_presigned_client(self, queue_info):
|
||||||
|
queue_name = queue_info.get('queue_name', [''])[0]
|
||||||
|
if not queue_name:
|
||||||
|
return None, None
|
||||||
|
|
||||||
|
signature = queue_info.get('signature', [''])[0]
|
||||||
|
expires = queue_info.get('expires', [''])[0]
|
||||||
|
paths = queue_info.get('paths', [''])[0].split(',')
|
||||||
|
methods = queue_info.get('methods', [''])[0].split(',')
|
||||||
|
project_id = queue_info.get('project_id', [''])[0]
|
||||||
|
params = {
|
||||||
|
'auth_opts': {
|
||||||
|
'backend': 'signed-url',
|
||||||
|
'options': {
|
||||||
|
'signature': signature,
|
||||||
|
'expires': expires,
|
||||||
|
'methods': methods,
|
||||||
|
'paths': paths,
|
||||||
|
'os_project_id': project_id
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
try:
|
||||||
|
from zaqarclient.queues import client as zaqar_client
|
||||||
|
return (zaqar_client.Client(self._get_endpoint(),
|
||||||
|
version=2, conf=params),
|
||||||
|
queue_name)
|
||||||
except Exception:
|
except Exception:
|
||||||
LOG.error(_LE("Failed to connect to Zaqar service "),
|
LOG.error(_LE("Failed to connect to Zaqar service "),
|
||||||
exc_info=True)
|
exc_info=True)
|
||||||
@ -101,21 +136,28 @@ class ZaqarAlarmNotifier(notifier.AlarmNotifier):
|
|||||||
|
|
||||||
def notify_zaqar(self, action, message):
|
def notify_zaqar(self, action, message):
|
||||||
queue_info = urlparse.parse_qs(action.query)
|
queue_info = urlparse.parse_qs(action.query)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
# NOTE(flwang): Try to get build a pre-signed client if user has
|
||||||
|
# provide enough information about that. Otherwise, go to build
|
||||||
|
# a client with service account and queue name for this alarm.
|
||||||
|
zaqar_client, queue_name = self.get_presigned_client(queue_info)
|
||||||
|
|
||||||
|
if not zaqar_client or not queue_name:
|
||||||
|
zaqar_client = self.client
|
||||||
# queue_name is a combination of <alarm-id>-<topic>
|
# queue_name is a combination of <alarm-id>-<topic>
|
||||||
queue_name = "%s-%s" % (message['body']['alarm_id'],
|
queue_name = "%s-%s" % (message['body']['alarm_id'],
|
||||||
queue_info.get('topic')[-1])
|
queue_info.get('topic')[-1])
|
||||||
|
|
||||||
# create a queue in zaqar
|
# create a queue in zaqar
|
||||||
queue = self.client.queue(queue_name, force_create=True)
|
queue = zaqar_client.queue(queue_name)
|
||||||
|
|
||||||
subscriber_list = queue_info.get('subscriber', [])
|
subscriber_list = queue_info.get('subscriber', [])
|
||||||
ttl = queue_info.get('ttl', [3600])[-1]
|
ttl = int(queue_info.get('ttl', ['3600'])[-1])
|
||||||
for subscriber in subscriber_list:
|
for subscriber in subscriber_list:
|
||||||
# add subscriber to the zaqar queue
|
# add subscriber to the zaqar queue
|
||||||
subscription_data = dict(subscriber=subscriber,
|
subscription_data = dict(subscriber=subscriber,
|
||||||
ttl=ttl)
|
ttl=ttl)
|
||||||
self.client.subscription(queue_name,
|
zaqar_client.subscription(queue_name, **subscription_data)
|
||||||
**subscription_data)
|
|
||||||
# post the message to the queue
|
# post the message to the queue
|
||||||
queue.post(message)
|
queue.post(message)
|
||||||
except IndexError:
|
except IndexError:
|
||||||
|
@ -386,6 +386,25 @@ class TestAlarmNotifier(tests_base.BaseTestCase):
|
|||||||
self.assertEqual(self.zaqar,
|
self.assertEqual(self.zaqar,
|
||||||
self.service.notifiers['zaqar'].obj.client)
|
self.service.notifiers['zaqar'].obj.client)
|
||||||
|
|
||||||
|
def test_presigned_zaqar_notifier_action(self):
|
||||||
|
with mock.patch.object(notifier.zaqar.ZaqarAlarmNotifier,
|
||||||
|
'get_presigned_client') as zaqar_client:
|
||||||
|
zaqar_client.return_value = self.zaqar, 'foobar-critical'
|
||||||
|
action = 'zaqar://?topic=critical&' \
|
||||||
|
'subscriber=http://example.com/data' \
|
||||||
|
'&subscriber=mailto:foo@example.com&ttl=7200' \
|
||||||
|
'&signature=mysignature&expires=2016-06-29T01:49:56' \
|
||||||
|
'&paths=/v2/queues/beijing/messages' \
|
||||||
|
'&methods=GET,PATCH,POST,PUT&queue_name=foobar-critical' \
|
||||||
|
'&project_id=my_project_id'
|
||||||
|
self._msg_notifier.sample({}, 'alarm.update',
|
||||||
|
self._notification(action))
|
||||||
|
time.sleep(1)
|
||||||
|
self.assertEqual(zaqar_client.return_value[0],
|
||||||
|
self.service.notifiers['zaqar'].obj.client)
|
||||||
|
queue_info = urlparse.parse_qs(urlparse.urlparse(action).query)
|
||||||
|
zaqar_client.assert_called_with(queue_info)
|
||||||
|
|
||||||
|
|
||||||
class FakeZaqarClient(object):
|
class FakeZaqarClient(object):
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user