Fix LDAP auth

LDAP auth has been broken during the configuration layout changes and
did not have enough tests to catch the issue.
This commit fixes both problems.

Change-Id: I9ee1750c42e61b37dd569102f170aefbff3e77af
Closes-bug: 1491531
This commit is contained in:
Stanisław Pitucha 2015-09-17 16:01:03 +10:00
parent 7623347163
commit 2993d5e265
2 changed files with 136 additions and 8 deletions

View File

@ -12,6 +12,7 @@
# under the License.
from __future__ import absolute_import
import logging
import ldap3
from ldap3.utils import dn
@ -20,6 +21,9 @@ from anchor.auth import results
from anchor import jsonloader
logger = logging.getLogger(__name__)
def user_get_groups(attributes):
"""Retrieve the group membership
@ -27,37 +31,44 @@ def user_get_groups(attributes):
:returns: List -- A list of groups that the user is a member of
"""
groups = attributes.get('memberOf', [])
logger.error("!!! groups: '%s'", groups)
group_dns = [dn.parse_dn(g) for g in groups]
return set(x[0][1] for x in group_dns if x[1] == ('OU', 'Groups', ','))
return [x[0][1] for x in group_dns if x[1] == ('OU', 'Groups', ',')]
def login(user, secret):
def login(ra_name, user, secret):
"""Attempt to Authenitcate user using LDAP
:param user: Username
:param secret: Secret/Passphrase
:returns: AuthDetails -- Class used for authentication information
"""
ldap_port = int(jsonloader.conf.auth['ldap'].get('port', 389))
use_ssl = jsonloader.conf.auth['ldap'].get('ssl', ldap_port == 636)
conf = jsonloader.authentication_for_registration_authority(ra_name)
ldap_port = int(conf.get('port', 389))
use_ssl = conf.get('ssl', ldap_port == 636)
lds = ldap3.Server(jsonloader.conf.auth['ldap']['host'], port=ldap_port,
lds = ldap3.Server(conf['host'], port=ldap_port,
get_info=ldap3.ALL, use_ssl=use_ssl)
try:
ldap_user = "%s@%s" % (user, jsonloader.conf.auth['ldap']['domain'])
ldap_user = "%s@%s" % (user, conf['domain'])
ldc = ldap3.Connection(lds, auto_bind=True, client_strategy=ldap3.SYNC,
user=ldap_user, password=secret,
authentication=ldap3.SIMPLE, check_names=True)
filter_str = ('(sAMAccountName=%s)' %
ldap3.utils.conv.escape_bytes(user))
ldc.search(jsonloader.conf.auth['ldap']['base'], filter_str,
ldc.search(conf['base'], filter_str,
ldap3.SUBTREE, attributes=['memberOf'])
if ldc.result['result'] != 0:
return None
user_attrs = ldc.response[0]['attributes']
user_groups = user_get_groups(user_attrs)
return results.AuthDetails(username=user, groups=user_groups)
except ldap3.LDAPBindError:
except ldap3.LDAPSocketOpenError:
logger.error("cannot connect to LDAP host '%s' (authority '%s')",
conf['host'], ra_name)
return None
except ldap3.LDAPBindError:
logger.info("failed ldap auth for user %s", user)
return None

117
tests/auth/test_ldap.py Normal file
View File

@ -0,0 +1,117 @@
# -*- coding:utf-8 -*-
#
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import unittest
import ldap3
import mock
from webob import exc as http_status
from anchor import auth
from anchor.auth import results
from anchor import jsonloader
import tests
class AuthLdapTests(tests.DefaultConfigMixin, unittest.TestCase):
def setUp(self):
super(AuthLdapTests, self).setUp()
self.sample_conf_auth['default_auth'] = {
"backend": "ldap",
"host": "ldap.example.com",
"base": "CN=Users,DC=example,DC=com",
"domain": "example.com",
"port": 636,
"ssl": True
}
def tearDown(self):
pass
@mock.patch('ldap3.Connection')
def test_login_good(self, mock_connection):
"""Test all static user/pass authentication paths."""
jsonloader.conf.load_extensions()
config = "anchor.jsonloader.conf._config"
mock_ldc = mock.Mock()
mock_connection.return_value = mock_ldc
mock_ldc.result = {'result': 0}
mock_ldc.response = [{'attributes': {}}]
with mock.patch.dict(config, self.sample_conf):
expected = results.AuthDetails(username='user', groups=[])
self.assertEqual(auth.validate('default_ra', 'user', 'pass'),
expected)
@mock.patch('ldap3.Connection')
def test_login_good_with_groups(self, mock_connection):
"""Test all static user/pass authentication paths."""
jsonloader.conf.load_extensions()
config = "anchor.jsonloader.conf._config"
mock_ldc = mock.Mock()
mock_connection.return_value = mock_ldc
mock_ldc.result = {'result': 0}
mock_ldc.response = [{'attributes': {'memberOf': [
u'CN=some_group,OU=Groups,DC=example,DC=com',
u'CN=other_group,OU=Groups,DC=example,DC=com']}}]
with mock.patch.dict(config, self.sample_conf):
expected = results.AuthDetails(
username='user',
groups=[u'some_group', u'other_group'])
self.assertEqual(auth.validate('default_ra', 'user', 'pass'),
expected)
@mock.patch('ldap3.Connection')
def test_login_search_fail(self, mock_connection):
"""Test all static user/pass authentication paths."""
jsonloader.conf.load_extensions()
config = "anchor.jsonloader.conf._config"
mock_ldc = mock.Mock()
mock_connection.return_value = mock_ldc
mock_ldc.result = {'result': 1}
with mock.patch.dict(config, self.sample_conf):
with self.assertRaises(http_status.HTTPUnauthorized):
auth.validate('default_ra', 'user', 'pass')
@mock.patch('ldap3.Connection')
def test_login_bind_fail(self, mock_connection):
"""Test all static user/pass authentication paths."""
jsonloader.conf.load_extensions()
config = "anchor.jsonloader.conf._config"
mock_connection.side_effect = ldap3.LDAPBindError()
with mock.patch.dict(config, self.sample_conf):
with self.assertRaises(http_status.HTTPUnauthorized):
auth.validate('default_ra', 'user', 'pass')
@mock.patch('ldap3.Connection')
def test_login_connection_fail(self, mock_connection):
"""Test all static user/pass authentication paths."""
jsonloader.conf.load_extensions()
config = "anchor.jsonloader.conf._config"
mock_connection.side_effect = ldap3.LDAPSocketOpenError()
with mock.patch.dict(config, self.sample_conf):
with self.assertRaises(http_status.HTTPUnauthorized):
auth.validate('default_ra', 'user', 'pass')