From d17171579847ad18e0afeefa414d68d7442c2e14 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stanis=C5=82aw=20Pitucha?= Date: Fri, 16 Oct 2015 10:30:59 +1100 Subject: [PATCH] Add key size validator Add a validator for the public key sizes. This allows to reject a request with a 512b long RSA key for example. Change-Id: Ib4988e595c4c5cdc643af56e9529e8c0de31d993 --- anchor/X509/signing_request.py | 18 ++++++++-- anchor/validators/custom.py | 31 ++++++++++++++++ doc/source/validators.rst | 8 +++++ tests/validators/test_callable_validators.py | 38 ++++++++++++++++++++ 4 files changed, 92 insertions(+), 3 deletions(-) diff --git a/anchor/X509/signing_request.py b/anchor/X509/signing_request.py index 88d0857..4124ad4 100644 --- a/anchor/X509/signing_request.py +++ b/anchor/X509/signing_request.py @@ -171,6 +171,20 @@ class X509Csr(signature.SignatureMixin): ext_attr['vals'][0] = encoder.encode(exts) + def get_public_key_algo(self): + csr_info = self._csr['certificationRequestInfo'] + key_info = csr_info['subjectPublicKeyInfo'] + return key_info['algorithm']['algorithm'] + + def get_public_key_size(self): + return self._get_public_key().key_size + + def get_public_key(self): + return self._get_public_key() + + def get_signing_algorithm(self): + return self._get_signing_algorithm() + def _get_signature(self): return utils.bin_to_bytes(self._csr['signature']) @@ -180,9 +194,7 @@ class X509Csr(signature.SignatureMixin): def _get_public_key(self): csr_info = self._csr['certificationRequestInfo'] key_info = csr_info['subjectPublicKeyInfo'] - csr_public_key = key_info['subjectPublicKey'] - return utils.get_public_key_from_der( - utils.bin_to_bytes(csr_public_key)) + return utils.get_public_key_from_der(encoder.encode(key_info)) def _get_bytes_to_sign(self): return encoder.encode(self._csr['certificationRequestInfo']) diff --git a/anchor/validators/custom.py b/anchor/validators/custom.py index 16eb83c..97de958 100644 --- a/anchor/validators/custom.py +++ b/anchor/validators/custom.py @@ -17,6 +17,8 @@ import logging import netaddr from pyasn1.type import univ as pyasn1_univ +from pyasn1_modules import rfc2437 # PKCS#1 +from pyasn1_modules import rfc2459 # X509 from anchor.validators import errors as v_errors from anchor.validators import utils @@ -236,3 +238,32 @@ def csr_signature(csr=None, **kwargs): raise v_errors.ValidationError("Signature on the CSR is not valid") except errors.X509Error: raise v_errors.ValidationError("Signature on the CSR is not valid") + + +def public_key(csr=None, allowed_keys=None, **kwargs): + """Ensure the public key has the known type and size. + + Configuration provides a dictionary of key types and minimum sizes. + """ + if allowed_keys is None or not isinstance(allowed_keys, dict): + raise v_errors.ValidationError("Allowed keys configuration missing") + + algo = csr.get_public_key_algo() + algo_names = { + rfc2437.rsaEncryption: 'RSA', + rfc2459.id_dsa: 'DSA', + } + algo_name = algo_names.get(algo) + if algo_name is None: + raise v_errors.ValidationError("Unknown public key type") + + min_size = allowed_keys.get(algo_name) + if min_size is None: + raise v_errors.ValidationError( + "Key type not allowed (%s)" % (algo_name,)) + if min_size == 0: + # key size is not enforced + return + + if csr.get_public_key_size() < min_size: + raise v_errors.ValidationError("Key size too small") diff --git a/doc/source/validators.rst b/doc/source/validators.rst index beb1bf4..e54c51a 100644 --- a/doc/source/validators.rst +++ b/doc/source/validators.rst @@ -120,6 +120,14 @@ The following validators are implemented at the moment: Ensures the request comes from one of the ranges in `cidrs`. +``public_key`` + Verifies: CSR. Parameters: ``allowed_keys``. + + Ensures that only selected keys of a minimum specified length can be used + in the CSR. The ``allowed_keys`` parameter is a dictionary where keys are + the uppercase key names and values are minimum key lengths. Valid keys + at the moment are: ``RSA`` and ``DSA``. + Extension interface ------------------- diff --git a/tests/validators/test_callable_validators.py b/tests/validators/test_callable_validators.py index a672bc6..6a4ce2e 100644 --- a/tests/validators/test_callable_validators.py +++ b/tests/validators/test_callable_validators.py @@ -14,10 +14,12 @@ # License for the specific language governing permissions and limitations # under the License. +import base64 import unittest import mock import netaddr +from pyasn1.codec.der import decoder from pyasn1_modules import rfc2459 from anchor.validators import custom @@ -629,3 +631,39 @@ class TestValidators(tests.DefaultRequestMixin, unittest.TestCase): with self.assertRaisesRegexp(errors.ValidationError, "Signature on the CSR is not valid"): custom.csr_signature(csr=csr) + + def test_public_key_good_rsa(self): + csr = x509_csr.X509Csr.from_buffer(self.csr_sample) + self.assertEqual(None, custom.public_key(csr=csr, + allowed_keys={'RSA': 1024})) + + def test_public_key_good_dsa(self): + dsa_key_pem = """ + MIIBtjCCASsGByqGSM44BAEwggEeAoGBAJv/ZwltxEMrACE71R+AvxOuvWgTIKAd + iVq9ATbcuiaMq5P+iyhsI0k5A29bLNxkU/kkUCBYEEOoM2R1+8eO6UVr40+dtVw8 + OzqHI6nFVmWMNUDGdPFoIIWsh5KRavhgy3Z8CKDqvGf4hxR1QWEN4Jz51xtHS3fI + 1SKJybWdu2ifAhUAgoQ1AiWH9zLU6AOafUdv6iNdxKsCgYA66IS+XsIZwQvkHJkA + rf9hbOGC8aZeuafm7PlU6C+7TRB+7hoPzrwkn0ROYhv3yGsFYKWBEjAorW/skNJQ + cmdPsZV9tGdkfyvj5lxmAAbu+4ofozUvwKlSvpa/e/PLY7aZCq8u+fSHsF+xpUNl + GlCRV1DL13tDWZb+XS8w7RD3EQOBhAACgYBu77erOhm/hF6l6u6wuyaM0GfgdMxg + eU5WnfcTJOzXXZBcv3cetn/OF0OG3e81R+/78xIjpx+b1q5bjXvqNRfZWr8Vov+Y + ox6WOB2kdxa+tRgpK1Bs6FqJgI7AWMYVSxgjpx+9Q/j6aZe6+r8m6k9HOU0cw+0L + 7PFU2eVGvF/DYA== + """ + dsa_key_der = base64.b64decode(dsa_key_pem) + spki = decoder.decode(dsa_key_der, + asn1Spec=rfc2459.SubjectPublicKeyInfo())[0] + csr = x509_csr.X509Csr.from_buffer(self.csr_sample) + csr._csr['certificationRequestInfo']['subjectPublicKeyInfo'] = spki + self.assertEqual(None, custom.public_key(csr=csr, + allowed_keys={'DSA': 1024})) + + def test_public_key_too_short(self): + csr = x509_csr.X509Csr.from_buffer(self.csr_sample) + with self.assertRaises(errors.ValidationError): + custom.public_key(csr=csr, allowed_keys={'RSA': 99999999}) + + def test_public_key_wrong_algo(self): + csr = x509_csr.X509Csr.from_buffer(self.csr_sample) + with self.assertRaises(errors.ValidationError): + custom.public_key(csr=csr, allowed_keys={'XXX': 0})