Fix trust notifier
The keystone methods to get a client with a trust is broken, and simply use service credentials while ignoring the trust. We can't load the options directly from the configuration, we need to build a auth plugin manually. Change-Id: Ic5edb144eac3a4b30b9835bf251f25f65b8c29b5
This commit is contained in:
parent
77238e3367
commit
ff3d24ecbd
@ -434,8 +434,7 @@ class Alarm(base.Base):
|
||||
auth_plugin = pecan.request.environ.get('keystone.token_auth')
|
||||
url = netutils.urlsplit(action)
|
||||
if self._is_trust_url(url) and url.password:
|
||||
keystone_client.delete_trust_id(pecan.request.cfg,
|
||||
url.username, auth_plugin)
|
||||
keystone_client.delete_trust_id(url.username, auth_plugin)
|
||||
|
||||
|
||||
Alarm.add_attributes(**{"%s_rule" % ext.name: ext.plugin
|
||||
|
@ -16,7 +16,9 @@
|
||||
import os
|
||||
|
||||
from keystoneauth1 import exceptions as ka_exception
|
||||
from keystoneauth1.identity.generic import password
|
||||
from keystoneauth1 import loading as ka_loading
|
||||
from keystoneclient import session
|
||||
from keystoneclient.v3 import client as ks_client_v3
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log
|
||||
@ -26,36 +28,42 @@ LOG = log.getLogger(__name__)
|
||||
CFG_GROUP = "service_credentials"
|
||||
|
||||
|
||||
def get_session(conf, requests_session=None):
|
||||
def get_session(conf):
|
||||
"""Get an aodh service credentials auth session."""
|
||||
auth_plugin = ka_loading.load_auth_from_conf_options(conf, CFG_GROUP)
|
||||
session = ka_loading.load_session_from_conf_options(
|
||||
conf, CFG_GROUP, auth=auth_plugin, session=requests_session
|
||||
return ka_loading.load_session_from_conf_options(
|
||||
conf, CFG_GROUP, auth=auth_plugin
|
||||
)
|
||||
return session
|
||||
|
||||
|
||||
def get_client(conf, trust_id=None, requests_session=None):
|
||||
"""Return a client for keystone v3 endpoint, optionally using a trust."""
|
||||
session = get_session(conf, requests_session=requests_session)
|
||||
return ks_client_v3.Client(session=session, trust_id=trust_id)
|
||||
def get_client(conf):
|
||||
"""Return a client for keystone v3 endpoint."""
|
||||
sess = get_session(conf)
|
||||
return ks_client_v3.Client(session=sess)
|
||||
|
||||
|
||||
def get_service_catalog(client):
|
||||
return client.session.auth.get_access(client.session).service_catalog
|
||||
def get_trusted_client(conf, trust_id):
|
||||
# Ideally we would use load_session_from_conf_options, but we can't do that
|
||||
# *and* specify a trust, so let's create the object manually.
|
||||
auth_plugin = password.Password(
|
||||
username=conf[CFG_GROUP].username,
|
||||
password=conf[CFG_GROUP].password,
|
||||
auth_url=conf[CFG_GROUP].auth_url,
|
||||
user_domain_id=conf[CFG_GROUP].user_domain_id,
|
||||
trust_id=trust_id)
|
||||
|
||||
sess = session.Session(auth=auth_plugin)
|
||||
return ks_client_v3.Client(session=sess)
|
||||
|
||||
|
||||
def get_auth_token(client):
|
||||
return client.session.auth.get_access(client.session).auth_token
|
||||
|
||||
|
||||
def get_client_on_behalf_user(conf, auth_plugin, trust_id=None,
|
||||
requests_session=None):
|
||||
"""Return a client for keystone v3 endpoint, optionally using a trust."""
|
||||
session = ka_loading.load_session_from_conf_options(
|
||||
conf, CFG_GROUP, auth=auth_plugin, session=requests_session
|
||||
)
|
||||
return ks_client_v3.Client(session=session, trust_id=trust_id)
|
||||
def get_client_on_behalf_user(auth_plugin):
|
||||
"""Return a client for keystone v3 endpoint."""
|
||||
sess = session.Session(auth=auth_plugin)
|
||||
return ks_client_v3.Client(session=sess)
|
||||
|
||||
|
||||
def create_trust_id(conf, trustor_user_id, trustor_project_id, roles,
|
||||
@ -64,7 +72,7 @@ def create_trust_id(conf, trustor_user_id, trustor_project_id, roles,
|
||||
admin_client = get_client(conf)
|
||||
trustee_user_id = admin_client.session.get_user_id()
|
||||
|
||||
client = get_client_on_behalf_user(conf, auth_plugin=auth_plugin)
|
||||
client = get_client_on_behalf_user(auth_plugin)
|
||||
trust = client.trusts.create(trustor_user=trustor_user_id,
|
||||
trustee_user=trustee_user_id,
|
||||
project=trustor_project_id,
|
||||
@ -73,9 +81,9 @@ def create_trust_id(conf, trustor_user_id, trustor_project_id, roles,
|
||||
return trust.id
|
||||
|
||||
|
||||
def delete_trust_id(conf, trust_id, auth_plugin):
|
||||
def delete_trust_id(trust_id, auth_plugin):
|
||||
"""Delete a trust previously setup for the aodh user."""
|
||||
client = get_client_on_behalf_user(conf, auth_plugin=auth_plugin)
|
||||
client = get_client_on_behalf_user(auth_plugin)
|
||||
try:
|
||||
client.trusts.delete(trust_id)
|
||||
except ka_exception.NotFound:
|
||||
|
@ -34,7 +34,7 @@ class TrustRestAlarmNotifier(rest.RestAlarmNotifier):
|
||||
reason, reason_data):
|
||||
trust_id = action.username
|
||||
|
||||
client = keystone_client.get_client(self.conf, trust_id)
|
||||
client = keystone_client.get_trusted_client(self.conf, trust_id)
|
||||
|
||||
# Remove the fake user
|
||||
netloc = action.netloc.split("@")[1]
|
||||
|
@ -340,8 +340,9 @@ class TestAlarmNotifier(tests_base.BaseTestCase):
|
||||
client = mock.MagicMock()
|
||||
client.session.auth.get_access.return_value.auth_token = 'token_1234'
|
||||
|
||||
self.useFixture(mockpatch.Patch('keystoneclient.v3.client.Client',
|
||||
lambda **kwargs: client))
|
||||
self.useFixture(
|
||||
mockpatch.Patch('aodh.keystone_client.get_trusted_client',
|
||||
lambda *args: client))
|
||||
|
||||
with mock.patch.object(requests.Session, 'post') as poster:
|
||||
self._msg_notifier.sample({}, 'alarm.update',
|
||||
|
Loading…
x
Reference in New Issue
Block a user