Merge "Adding more tests against X509 certificate code"
This commit is contained in:
commit
6b060e5ad6
@ -11,6 +11,8 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import time
|
||||
|
||||
from cryptography.hazmat.backends.openssl import backend
|
||||
import errors
|
||||
import message_digest
|
||||
@ -68,7 +70,6 @@ class X509Certificate(object):
|
||||
self._lib.X509_free(self._certObj)
|
||||
|
||||
def _asn1_utctime(self, t):
|
||||
# asn1_utctime = self._lib.ASN1_UTCTIME_new()
|
||||
asn1_utctime = self._lib.ASN1_UTCTIME_set(self._ffi.NULL, t)
|
||||
if asn1_utctime == self._ffi.NULL:
|
||||
raise X509CertificateError("Could not create ASN1_UTCTIME "
|
||||
@ -76,6 +77,22 @@ class X509Certificate(object):
|
||||
|
||||
return asn1_utctime
|
||||
|
||||
def _python_utctime(self, t):
|
||||
bio = self._lib.BIO_new(self._lib.BIO_s_mem())
|
||||
bio = self._ffi.gc(bio, self._lib.BIO_free)
|
||||
|
||||
val = self._lib.ASN1_UTCTIME_print(bio, t)
|
||||
if val != 1:
|
||||
raise X509CertificateError("Could not print"
|
||||
" ASN1_UTCTIME") # pragma: no cover
|
||||
size = 1024
|
||||
data = self._ffi.new("char[]", size)
|
||||
self._lib.BIO_gets(bio, data, size)
|
||||
data = self._ffi.string(data)
|
||||
|
||||
val = time.strptime(data, "%b %d %H:%M:%S %Y %Z")
|
||||
return time.mktime(val) # seconds since the epoch
|
||||
|
||||
def from_buffer(self, data):
|
||||
"""Build this X509 object from a data buffer in memory.
|
||||
|
||||
@ -130,29 +147,45 @@ class X509Certificate(object):
|
||||
raise X509CertificateError("Could not set X509 certificate "
|
||||
"version.") # pragma: no cover
|
||||
|
||||
def get_version(self):
|
||||
"""Get the version of this X509 certificate object."""
|
||||
return self._lib.X509_get_version(self._certObj)
|
||||
|
||||
def set_not_before(self, t):
|
||||
"""Set the 'not before' date field.
|
||||
|
||||
:param t: a Python date-time object
|
||||
:param t: time in seconds since the epoch
|
||||
"""
|
||||
ansi1_utc = self._asn1_utctime(t)
|
||||
ret = self._lib.X509_set_notBefore(self._certObj, ansi1_utc)
|
||||
self._lib.ASN1_UTCTIME_free(ansi1_utc)
|
||||
if ret == 0:
|
||||
raise X509CertificateError("Could not set X509 certificate "
|
||||
"not before time.")
|
||||
"not before time.") # pragma: no cover
|
||||
|
||||
def get_not_before(self):
|
||||
"""Get the 'not before' date field as seconds since the epoch."""
|
||||
not_before = self._lib.X509_get_notBefore(self._certObj)
|
||||
not_before = self._ffi.cast("ASN1_UTCTIME*", not_before)
|
||||
return self._python_utctime(not_before)
|
||||
|
||||
def set_not_after(self, t):
|
||||
"""Set the 'not after' date field.
|
||||
|
||||
:param t: a Python date-time object
|
||||
:param t: time in seconds since the epoch
|
||||
"""
|
||||
ansi1_utc = self._asn1_utctime(t)
|
||||
ret = self._lib.X509_set_notAfter(self._certObj, ansi1_utc)
|
||||
self._lib.ASN1_UTCTIME_free(ansi1_utc)
|
||||
if ret == 0:
|
||||
raise X509CertificateError("Could not set X509 certificate "
|
||||
"not after time.")
|
||||
"not after time.") # pragma: no cover
|
||||
|
||||
def get_not_after(self):
|
||||
"""Get the 'not after' date field as seconds since the epoch."""
|
||||
not_after = self._lib.X509_get_notAfter(self._certObj)
|
||||
not_after = self._ffi.cast("ASN1_UTCTIME*", not_after)
|
||||
return self._python_utctime(not_after)
|
||||
|
||||
def set_pubkey(self, pkey):
|
||||
"""Set the public key field.
|
||||
@ -172,7 +205,7 @@ class X509Certificate(object):
|
||||
val = self._lib.X509_get_subject_name(self._certObj)
|
||||
if val == self._ffi.NULL:
|
||||
raise X509CertificateError("Could not get subject from X509 "
|
||||
"certificate.")
|
||||
"certificate.") # pragma: no cover
|
||||
|
||||
return name.X509Name(val)
|
||||
|
||||
@ -185,7 +218,7 @@ class X509Certificate(object):
|
||||
ret = self._lib.X509_set_subject_name(self._certObj, val)
|
||||
if ret == 0:
|
||||
raise X509CertificateError("Could not set X509 certificate "
|
||||
"subject.")
|
||||
"subject.") # pragma: no cover
|
||||
|
||||
def set_issuer(self, issuer):
|
||||
"""Set the issuer name field value.
|
||||
@ -196,7 +229,7 @@ class X509Certificate(object):
|
||||
ret = self._lib.X509_set_issuer_name(self._certObj, val)
|
||||
if ret == 0:
|
||||
raise X509CertificateError("Could not set X509 certificate "
|
||||
"issuer.")
|
||||
"issuer.") # pragma: no cover
|
||||
|
||||
def get_issuer(self):
|
||||
"""Get the issuer name field value.
|
||||
@ -206,7 +239,7 @@ class X509Certificate(object):
|
||||
val = self._lib.X509_get_issuer_name(self._certObj)
|
||||
if val == self._ffi.NULL:
|
||||
raise X509CertificateError("Could not get subject from X509 "
|
||||
"certificate.")
|
||||
"certificate.") # pragma: no cover
|
||||
return name.X509Name(val)
|
||||
|
||||
def set_serial_number(self, serial):
|
||||
|
@ -14,6 +14,7 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import time
|
||||
import unittest
|
||||
|
||||
import mock
|
||||
@ -248,9 +249,9 @@ class TestX509Cert(unittest.TestCase):
|
||||
mock_open.return_value = mock.MagicMock(spec=file)
|
||||
m_file = mock_open.return_value.__enter__.return_value
|
||||
m_file.read.return_value = TestX509Cert.cert_data
|
||||
|
||||
cert = certificate.X509Certificate()
|
||||
cert.from_file("some_path")
|
||||
|
||||
name = cert.get_subject()
|
||||
entries = name.get_entries_by_nid_name('C')
|
||||
self.assertEqual(entries[0].get_value(), "UK")
|
||||
@ -268,3 +269,32 @@ class TestX509Cert(unittest.TestCase):
|
||||
self.assertRaises(x509_errors.X509Error,
|
||||
self.cert.sign,
|
||||
self.cert._ffi.NULL)
|
||||
|
||||
def test_get_version(self):
|
||||
v = self.cert.get_version()
|
||||
self.assertEqual(v, 2)
|
||||
|
||||
def test_set_version(self):
|
||||
self.cert.set_version(5)
|
||||
v = self.cert.get_version()
|
||||
self.assertEqual(v, 5)
|
||||
|
||||
def test_get_not_before(self):
|
||||
val = self.cert.get_not_before()
|
||||
self.assertEqual(1421244619.0, val)
|
||||
|
||||
def test_set_not_before(self):
|
||||
self.cert.set_not_before(0) # seconds since epoch
|
||||
val = self.cert.get_not_before()
|
||||
tst = time.mktime(time.gmtime(0))
|
||||
self.assertEqual(tst, val)
|
||||
|
||||
def test_get_not_after(self):
|
||||
val = self.cert.get_not_after()
|
||||
self.assertEqual(1421331019.0, val)
|
||||
|
||||
def test_set_not_after(self):
|
||||
self.cert.set_not_after(0) # seconds since epoch
|
||||
val = self.cert.get_not_after()
|
||||
tst = time.mktime(time.gmtime(0))
|
||||
self.assertEqual(tst, val)
|
||||
|
Loading…
x
Reference in New Issue
Block a user