Add pre-signed support for zaqar notifier

Currently the zaqar notifer driver will create a queue per alarm
in service tenant, as a result, end user can't access the queue
from zaqar. Which makes it's hard to add a subscriber like mistral
because it needs a complex subscription options. The patch will
provide a capabability for end user let aodh forward the alarms
to user's queue instead of the queue created automatically in
service tenant. And this patch could benefit any 3rd party which
interested in the alarm information.

Change-Id: I4b843b39c8a13bd40aa2923a62ba681c81e06e9a
This commit is contained in:
Fei Long Wang 2016-06-29 15:23:04 +12:00 committed by Julien Danjou
parent e0dec8273e
commit 9ee22bc9e2
3 changed files with 82 additions and 18 deletions

View File

@ -40,17 +40,21 @@ class ZaqarAlarmNotifier(notifier.AlarmNotifier):
super(ZaqarAlarmNotifier, self).__init__(conf)
self.conf = conf
self._zclient = None
self._zendpoint = None
def _get_endpoint(self):
try:
ks_client = keystone_client.get_client(self.conf)
return ks_client.service_catalog.url_for(
service_type=self.conf.service_types.zaqar,
endpoint_type=self.conf.service_credentials.os_endpoint_type)
except Exception:
LOG.error(_LE("Aodh was configured to use zaqar:// action,"
" but Zaqar endpoint could not be found in Keystone"
" service catalog."))
if self._zendpoint is None:
try:
ks_client = keystone_client.get_client(self.conf)
endpoint_type = self.conf.service_credentials.os_endpoint_type
self._zendpoint = ks_client.service_catalog.url_for(
service_type=self.conf.service_types.zaqar,
endpoint_type=endpoint_type)
except Exception:
LOG.error(_LE("Aodh was configured to use zaqar:// action,"
" but Zaqar endpoint could not be found in"
" Keystone service catalog."))
return self._zendpoint
def get_zaqar_client(self):
conf = self.conf.service_credentials
@ -69,7 +73,38 @@ class ZaqarAlarmNotifier(notifier.AlarmNotifier):
try:
from zaqarclient.queues import client as zaqar_client
return zaqar_client.Client(self._get_endpoint(),
version=1.1, conf=params)
version=2, conf=params)
except Exception:
LOG.error(_LE("Failed to connect to Zaqar service "),
exc_info=True)
def get_presigned_client(self, queue_info):
queue_name = queue_info.get('queue_name', [''])[0]
if not queue_name:
return None, None
signature = queue_info.get('signature', [''])[0]
expires = queue_info.get('expires', [''])[0]
paths = queue_info.get('paths', [''])[0].split(',')
methods = queue_info.get('methods', [''])[0].split(',')
project_id = queue_info.get('project_id', [''])[0]
params = {
'auth_opts': {
'backend': 'signed-url',
'options': {
'signature': signature,
'expires': expires,
'methods': methods,
'paths': paths,
'os_project_id': project_id
}
}
}
try:
from zaqarclient.queues import client as zaqar_client
return (zaqar_client.Client(self._get_endpoint(),
version=2, conf=params),
queue_name)
except Exception:
LOG.error(_LE("Failed to connect to Zaqar service "),
exc_info=True)
@ -101,21 +136,28 @@ class ZaqarAlarmNotifier(notifier.AlarmNotifier):
def notify_zaqar(self, action, message):
queue_info = urlparse.parse_qs(action.query)
try:
# queue_name is a combination of <alarm-id>-<topic>
queue_name = "%s-%s" % (message['body']['alarm_id'],
queue_info.get('topic')[-1])
# NOTE(flwang): Try to get build a pre-signed client if user has
# provide enough information about that. Otherwise, go to build
# a client with service account and queue name for this alarm.
zaqar_client, queue_name = self.get_presigned_client(queue_info)
if not zaqar_client or not queue_name:
zaqar_client = self.client
# queue_name is a combination of <alarm-id>-<topic>
queue_name = "%s-%s" % (message['body']['alarm_id'],
queue_info.get('topic')[-1])
# create a queue in zaqar
queue = self.client.queue(queue_name, force_create=True)
queue = zaqar_client.queue(queue_name)
subscriber_list = queue_info.get('subscriber', [])
ttl = queue_info.get('ttl', [3600])[-1]
ttl = int(queue_info.get('ttl', ['3600'])[-1])
for subscriber in subscriber_list:
# add subscriber to the zaqar queue
subscription_data = dict(subscriber=subscriber,
ttl=ttl)
self.client.subscription(queue_name,
**subscription_data)
zaqar_client.subscription(queue_name, **subscription_data)
# post the message to the queue
queue.post(message)
except IndexError:

View File

@ -389,6 +389,25 @@ class TestAlarmNotifier(tests_base.BaseTestCase):
self.assertEqual(self.zaqar,
self.service.notifiers['zaqar'].obj.client)
def test_presigned_zaqar_notifier_action(self):
with mock.patch.object(notifier.zaqar.ZaqarAlarmNotifier,
'get_presigned_client') as zaqar_client:
zaqar_client.return_value = self.zaqar, 'foobar-critical'
action = 'zaqar://?topic=critical&' \
'subscriber=http://example.com/data' \
'&subscriber=mailto:foo@example.com&ttl=7200' \
'&signature=mysignature&expires=2016-06-29T01:49:56' \
'&paths=/v2/queues/beijing/messages' \
'&methods=GET,PATCH,POST,PUT&queue_name=foobar-critical' \
'&project_id=my_project_id'
self._msg_notifier.sample({}, 'alarm.update',
self._notification(action))
time.sleep(1)
self.assertEqual(zaqar_client.return_value[0],
self.service.notifiers['zaqar'].obj.client)
queue_info = urlparse.parse_qs(urlparse.urlparse(action).query)
zaqar_client.assert_called_with(queue_info)
class FakeZaqarClient(object):

View File

@ -50,6 +50,9 @@ hbase =
# Required for bson
pymongo>=3.0.2
zaqar =
python-zaqarclient>=1.2.0
doc =
oslosphinx>=2.5.0 # Apache-2.0
reno>=0.1.1 # Apache2