Add key size validator

Add a validator for the public key sizes. This allows to reject a
request with a 512b long RSA key for example.

Change-Id: Ib4988e595c4c5cdc643af56e9529e8c0de31d993
This commit is contained in:
Stanisław Pitucha 2015-10-16 10:30:59 +11:00
parent f826a63151
commit d171715798
4 changed files with 92 additions and 3 deletions

View File

@ -171,6 +171,20 @@ class X509Csr(signature.SignatureMixin):
ext_attr['vals'][0] = encoder.encode(exts)
def get_public_key_algo(self):
csr_info = self._csr['certificationRequestInfo']
key_info = csr_info['subjectPublicKeyInfo']
return key_info['algorithm']['algorithm']
def get_public_key_size(self):
return self._get_public_key().key_size
def get_public_key(self):
return self._get_public_key()
def get_signing_algorithm(self):
return self._get_signing_algorithm()
def _get_signature(self):
return utils.bin_to_bytes(self._csr['signature'])
@ -180,9 +194,7 @@ class X509Csr(signature.SignatureMixin):
def _get_public_key(self):
csr_info = self._csr['certificationRequestInfo']
key_info = csr_info['subjectPublicKeyInfo']
csr_public_key = key_info['subjectPublicKey']
return utils.get_public_key_from_der(
utils.bin_to_bytes(csr_public_key))
return utils.get_public_key_from_der(encoder.encode(key_info))
def _get_bytes_to_sign(self):
return encoder.encode(self._csr['certificationRequestInfo'])

View File

@ -17,6 +17,8 @@ import logging
import netaddr
from pyasn1.type import univ as pyasn1_univ
from pyasn1_modules import rfc2437 # PKCS#1
from pyasn1_modules import rfc2459 # X509
from anchor.validators import errors as v_errors
from anchor.validators import utils
@ -236,3 +238,32 @@ def csr_signature(csr=None, **kwargs):
raise v_errors.ValidationError("Signature on the CSR is not valid")
except errors.X509Error:
raise v_errors.ValidationError("Signature on the CSR is not valid")
def public_key(csr=None, allowed_keys=None, **kwargs):
"""Ensure the public key has the known type and size.
Configuration provides a dictionary of key types and minimum sizes.
"""
if allowed_keys is None or not isinstance(allowed_keys, dict):
raise v_errors.ValidationError("Allowed keys configuration missing")
algo = csr.get_public_key_algo()
algo_names = {
rfc2437.rsaEncryption: 'RSA',
rfc2459.id_dsa: 'DSA',
}
algo_name = algo_names.get(algo)
if algo_name is None:
raise v_errors.ValidationError("Unknown public key type")
min_size = allowed_keys.get(algo_name)
if min_size is None:
raise v_errors.ValidationError(
"Key type not allowed (%s)" % (algo_name,))
if min_size == 0:
# key size is not enforced
return
if csr.get_public_key_size() < min_size:
raise v_errors.ValidationError("Key size too small")

View File

@ -120,6 +120,14 @@ The following validators are implemented at the moment:
Ensures the request comes from one of the ranges in `cidrs`.
``public_key``
Verifies: CSR. Parameters: ``allowed_keys``.
Ensures that only selected keys of a minimum specified length can be used
in the CSR. The ``allowed_keys`` parameter is a dictionary where keys are
the uppercase key names and values are minimum key lengths. Valid keys
at the moment are: ``RSA`` and ``DSA``.
Extension interface
-------------------

View File

@ -14,10 +14,12 @@
# License for the specific language governing permissions and limitations
# under the License.
import base64
import unittest
import mock
import netaddr
from pyasn1.codec.der import decoder
from pyasn1_modules import rfc2459
from anchor.validators import custom
@ -629,3 +631,39 @@ class TestValidators(tests.DefaultRequestMixin, unittest.TestCase):
with self.assertRaisesRegexp(errors.ValidationError,
"Signature on the CSR is not valid"):
custom.csr_signature(csr=csr)
def test_public_key_good_rsa(self):
csr = x509_csr.X509Csr.from_buffer(self.csr_sample)
self.assertEqual(None, custom.public_key(csr=csr,
allowed_keys={'RSA': 1024}))
def test_public_key_good_dsa(self):
dsa_key_pem = """
MIIBtjCCASsGByqGSM44BAEwggEeAoGBAJv/ZwltxEMrACE71R+AvxOuvWgTIKAd
iVq9ATbcuiaMq5P+iyhsI0k5A29bLNxkU/kkUCBYEEOoM2R1+8eO6UVr40+dtVw8
OzqHI6nFVmWMNUDGdPFoIIWsh5KRavhgy3Z8CKDqvGf4hxR1QWEN4Jz51xtHS3fI
1SKJybWdu2ifAhUAgoQ1AiWH9zLU6AOafUdv6iNdxKsCgYA66IS+XsIZwQvkHJkA
rf9hbOGC8aZeuafm7PlU6C+7TRB+7hoPzrwkn0ROYhv3yGsFYKWBEjAorW/skNJQ
cmdPsZV9tGdkfyvj5lxmAAbu+4ofozUvwKlSvpa/e/PLY7aZCq8u+fSHsF+xpUNl
GlCRV1DL13tDWZb+XS8w7RD3EQOBhAACgYBu77erOhm/hF6l6u6wuyaM0GfgdMxg
eU5WnfcTJOzXXZBcv3cetn/OF0OG3e81R+/78xIjpx+b1q5bjXvqNRfZWr8Vov+Y
ox6WOB2kdxa+tRgpK1Bs6FqJgI7AWMYVSxgjpx+9Q/j6aZe6+r8m6k9HOU0cw+0L
7PFU2eVGvF/DYA==
"""
dsa_key_der = base64.b64decode(dsa_key_pem)
spki = decoder.decode(dsa_key_der,
asn1Spec=rfc2459.SubjectPublicKeyInfo())[0]
csr = x509_csr.X509Csr.from_buffer(self.csr_sample)
csr._csr['certificationRequestInfo']['subjectPublicKeyInfo'] = spki
self.assertEqual(None, custom.public_key(csr=csr,
allowed_keys={'DSA': 1024}))
def test_public_key_too_short(self):
csr = x509_csr.X509Csr.from_buffer(self.csr_sample)
with self.assertRaises(errors.ValidationError):
custom.public_key(csr=csr, allowed_keys={'RSA': 99999999})
def test_public_key_wrong_algo(self):
csr = x509_csr.X509Csr.from_buffer(self.csr_sample)
with self.assertRaises(errors.ValidationError):
custom.public_key(csr=csr, allowed_keys={'XXX': 0})