Sophie

Sophie

distrib > Fedora > 15 > i386 > by-pkgid > 5ccaeaf155c9e167f3a657dca111c2ac > files > 434

m2crypto-0.21.1-3.fc15.i686.rpm

#!/usr/bin/python

"""Unit tests for M2Crypto.X509.

Contributed by Toby Allsopp <toby@MI6.GEN.NZ> under M2Crypto's license.

Portions created by Open Source Applications Foundation (OSAF) are
Copyright (C) 2004-2005 OSAF. All Rights Reserved.
Author: Heikki Toivonen
"""

import unittest
import os, time, base64, sys
from M2Crypto import X509, EVP, RSA, Rand, ASN1, m2, util, BIO

class X509TestCase(unittest.TestCase):

    def callback(self, *args):
        pass

    def mkreq(self, bits, ca=0):
        pk = EVP.PKey()
        x = X509.Request()
        rsa = RSA.gen_key(bits, 65537, self.callback)
        pk.assign_rsa(rsa)
        rsa = None # should not be freed here
        x.set_pubkey(pk)
        name = x.get_subject()
        name.C = "UK"
        name.CN = "OpenSSL Group"
        if not ca:
            ext1 = X509.new_extension('subjectAltName', 'DNS:foobar.example.com')
            ext2 = X509.new_extension('nsComment', 'Hello there')
            extstack = X509.X509_Extension_Stack()
            extstack.push(ext1)
            extstack.push(ext2)
            x.add_extensions(extstack)
        self.assertRaises(ValueError, x.sign, pk, 'sha513')
        x.sign(pk,'sha1')
        assert x.verify(pk)
        pk2 = x.get_pubkey()
        assert x.verify(pk2)
        return x, pk

    def test_ext(self):
        self.assertRaises(ValueError, X509.new_extension,
                          'subjectKeyIdentifier', 'hash')
        ext = X509.new_extension('subjectAltName', 'DNS:foobar.example.com')
        assert ext.get_value() == 'DNS:foobar.example.com'
        assert ext.get_value(indent=2) == '  DNS:foobar.example.com'
        assert ext.get_value(flag=m2.X509V3_EXT_PARSE_UNKNOWN) == 'DNS:foobar.example.com'

    def test_extstack(self):
        # new
        ext1 = X509.new_extension('subjectAltName', 'DNS:foobar.example.com')
        ext2 = X509.new_extension('nsComment', 'Hello there')
        extstack = X509.X509_Extension_Stack()
        
        # push
        extstack.push(ext1)
        extstack.push(ext2)
        assert(extstack[1].get_name() == 'nsComment')
        assert len(extstack) == 2
        
        # iterator
        i = 0
        for e in extstack:
            i += 1
            assert len(e.get_name()) > 0
        assert i == 2
        
        # pop
        ext3 = extstack.pop()
        assert len(extstack) == 1
        assert(extstack[0].get_name() == 'subjectAltName')
        extstack.push(ext3)
        assert len(extstack) == 2
        assert(extstack[1].get_name() == 'nsComment')
        
        assert extstack.pop() is not None
        assert extstack.pop() is not None
        assert extstack.pop() is None

    def test_x509_name(self):
        n = X509.X509_Name()
        n.C = 'US' # It seems this actually needs to be a real 2 letter country code
        assert n.C == 'US'
        n.SP = 'State or Province'
        assert n.SP == 'State or Province'
        n.L = 'locality name'
        assert n.L == 'locality name'
        n.O = 'orhanization name'
        assert n.O == 'orhanization name'
        n.OU = 'org unit'
        assert n.OU == 'org unit'
        n.CN = 'common name'
        assert n.CN == 'common name'
        n.Email = 'bob@example.com'
        assert n.Email == 'bob@example.com'
        n.serialNumber = '1234'
        assert n.serialNumber == '1234'
        n.SN = 'surname'
        assert n.SN == 'surname'
        n.GN = 'given name'
        assert n.GN == 'given name'
        assert n.as_text() == 'C=US, ST=State or Province, L=locality name, O=orhanization name, OU=org unit, CN=common name/emailAddress=bob@example.com/serialNumber=1234, SN=surname, GN=given name', '"%s"' % n.as_text()
        assert len(n) == 10, len(n)
        n.givenName = 'name given'
        assert n.GN == 'given name' # Just gets the first
        assert n.as_text() == 'C=US, ST=State or Province, L=locality name, O=orhanization name, OU=org unit, CN=common name/emailAddress=bob@example.com/serialNumber=1234, SN=surname, GN=given name, GN=name given', '"%s"' % n.as_text()
        assert len(n) == 11, len(n)
        n.add_entry_by_txt(field="CN", type=ASN1.MBSTRING_ASC,
                           entry="Proxy", len=-1, loc=-1, set=0)
        assert len(n) == 12, len(n)
        assert n.entry_count() == 12, n.entry_count()
        assert n.as_text() == 'C=US, ST=State or Province, L=locality name, O=orhanization name, OU=org unit, CN=common name/emailAddress=bob@example.com/serialNumber=1234, SN=surname, GN=given name, GN=name given, CN=Proxy', '"%s"' % n.as_text()

        self.assertRaises(AttributeError, n.__getattr__, 'foobar')
        n.foobar = 1
        assert n.foobar == 1, n.foobar
        
        # X509_Name_Entry tests
        l = 0
        for entry in n:
            assert isinstance(entry, X509.X509_Name_Entry), entry
            assert isinstance(entry.get_object(), ASN1.ASN1_Object), entry
            assert isinstance(entry.get_data(), ASN1.ASN1_String), entry
            l += 1
        assert l == 12, l
        
        l = 0
        for cn in n.get_entries_by_nid(m2.NID_commonName):
            assert isinstance(cn, X509.X509_Name_Entry), cn
            assert isinstance(cn.get_object(), ASN1.ASN1_Object), cn
            data = cn.get_data()
            assert isinstance(data, ASN1.ASN1_String), data
            t = data.as_text()
            assert t == "common name" or t == "Proxy", t
            l += 1
        assert l == 2, l

        cn.set_data("Hello There!")
        assert cn.get_data().as_text() == "Hello There!", cn.get_data().as_text()

        assert n.as_hash() == 1697185131
        
        self.assertRaises(IndexError, lambda: n[100])
        self.assert_(n[10])

    def test_mkreq(self):
        (req, _) = self.mkreq(1024)
        req.save_pem('tests/tmp_request.pem')
        req2 = X509.load_request('tests/tmp_request.pem')
        os.remove('tests/tmp_request.pem')
        req.save('tests/tmp_request.pem')
        req3 = X509.load_request('tests/tmp_request.pem')
        os.remove('tests/tmp_request.pem')
        req.save('tests/tmp_request.der', format=X509.FORMAT_DER)
        req4 = X509.load_request('tests/tmp_request.der',
                format=X509.FORMAT_DER)
        os.remove('tests/tmp_request.der')
        assert req.as_pem() == req2.as_pem()
        assert req.as_text() == req2.as_text()
        assert req.as_der() == req2.as_der()
        assert req.as_pem() == req3.as_pem()
        assert req.as_text() == req3.as_text()
        assert req.as_der() == req3.as_der()
        assert req.as_pem() == req4.as_pem()
        assert req.as_text() == req4.as_text()
        assert req.as_der() == req4.as_der()
        self.assertEqual(req.get_version(), 0)
        req.set_version(1)
        self.assertEqual(req.get_version(), 1)
        req.set_version(0)
        self.assertEqual(req.get_version(), 0)


    def test_mkcert(self):
        req, pk = self.mkreq(1024)
        pkey = req.get_pubkey()
        assert(req.verify(pkey))
        sub = req.get_subject()
        assert len(sub) == 2, len(sub)
        cert = X509.X509()
        cert.set_serial_number(1)
        cert.set_version(2)
        cert.set_subject(sub)
        t = long(time.time()) + time.timezone
        now = ASN1.ASN1_UTCTIME()
        now.set_time(t)
        nowPlusYear = ASN1.ASN1_UTCTIME()
        nowPlusYear.set_time(t + 60 * 60 * 24 * 365)
        cert.set_not_before(now)
        cert.set_not_after(nowPlusYear)
        assert str(cert.get_not_before()) == str(now)
        assert str(cert.get_not_after()) == str(nowPlusYear)
        issuer = X509.X509_Name()
        issuer.CN = 'The Issuer Monkey'
        issuer.O = 'The Organization Otherwise Known as My CA, Inc.'
        cert.set_issuer(issuer)
        cert.set_pubkey(pkey)
        cert.set_pubkey(cert.get_pubkey()) # Make sure get/set work
        ext = X509.new_extension('subjectAltName', 'DNS:foobar.example.com')
        ext.set_critical(0)
        assert ext.get_critical() == 0
        cert.add_ext(ext)
        cert.sign(pk, 'sha1')
        self.assertRaises(ValueError, cert.sign, pk, 'nosuchalgo')
        assert(cert.get_ext('subjectAltName').get_name() == 'subjectAltName')
        assert(cert.get_ext_at(0).get_name() == 'subjectAltName')
        assert(cert.get_ext_at(0).get_value() == 'DNS:foobar.example.com')
        assert cert.get_ext_count() == 1, cert.get_ext_count()
        self.assertRaises(IndexError, cert.get_ext_at, 1)
        assert cert.verify()
        assert cert.verify(pkey)
        assert cert.verify(cert.get_pubkey())
        assert cert.get_version() == 2
        assert cert.get_serial_number() == 1
        assert cert.get_issuer().CN == 'The Issuer Monkey' 
        
        if m2.OPENSSL_VERSION_NUMBER >= 0x90800f:
            assert not cert.check_ca()
            assert not cert.check_purpose(m2.X509_PURPOSE_SSL_SERVER, 1)
            assert not cert.check_purpose(m2.X509_PURPOSE_NS_SSL_SERVER, 1)
            assert cert.check_purpose(m2.X509_PURPOSE_SSL_SERVER, 0)
            assert cert.check_purpose(m2.X509_PURPOSE_NS_SSL_SERVER, 0)
            assert cert.check_purpose(m2.X509_PURPOSE_ANY, 0)            
        else:
            self.assertRaises(AttributeError, cert.check_ca)

    def mkcacert(self):
        req, pk = self.mkreq(1024, ca=1)
        pkey = req.get_pubkey()
        sub = req.get_subject()
        cert = X509.X509()
        cert.set_serial_number(1)
        cert.set_version(2)
        cert.set_subject(sub)
        t = long(time.time()) + time.timezone
        now = ASN1.ASN1_UTCTIME()
        now.set_time(t)
        nowPlusYear = ASN1.ASN1_UTCTIME()
        nowPlusYear.set_time(t + 60 * 60 * 24 * 365)
        cert.set_not_before(now)
        cert.set_not_after(nowPlusYear)
        issuer = X509.X509_Name()
        issuer.C = "UK"
        issuer.CN = "OpenSSL Group"
        cert.set_issuer(issuer)
        cert.set_pubkey(pkey) 
        ext = X509.new_extension('basicConstraints', 'CA:TRUE')
        cert.add_ext(ext)
        cert.sign(pk, 'sha1')

        if m2.OPENSSL_VERSION_NUMBER >= 0x0090800fL:
            assert cert.check_ca()
            assert cert.check_purpose(m2.X509_PURPOSE_SSL_SERVER, 1)
            assert cert.check_purpose(m2.X509_PURPOSE_NS_SSL_SERVER, 1)
            assert cert.check_purpose(m2.X509_PURPOSE_ANY, 1)
            assert cert.check_purpose(m2.X509_PURPOSE_SSL_SERVER, 0)
            assert cert.check_purpose(m2.X509_PURPOSE_NS_SSL_SERVER, 0)
            assert cert.check_purpose(m2.X509_PURPOSE_ANY, 0)
        else:
            self.assertRaises(AttributeError, cert.check_ca)
        
        return cert, pk, pkey

    def test_mkcacert(self): 
        cacert, pk, pkey = self.mkcacert()
        assert cacert.verify(pkey)
        

    def test_mkproxycert(self): 
        cacert, pk1, pkey = self.mkcacert()
        end_entity_cert_req, pk2 = self.mkreq(1024)
        end_entity_cert = self.make_eecert(cacert)
        end_entity_cert.set_subject(end_entity_cert_req.get_subject())
        end_entity_cert.set_pubkey(end_entity_cert_req.get_pubkey())
        end_entity_cert.sign(pk1, 'sha1')
        proxycert = self.make_proxycert(end_entity_cert)
        proxycert.sign(pk2, 'sha1')
        assert proxycert.verify(pk2)
        assert proxycert.get_ext_at(0).get_name() == 'proxyCertInfo', proxycert.get_ext_at(0).get_name()
        assert proxycert.get_ext_at(0).get_value() == 'Path Length Constraint: infinite\nPolicy Language: Inherit all\n', '"%s"' % proxycert.get_ext_at(0).get_value()
        assert proxycert.get_ext_count() == 1, proxycert.get_ext_count()
        assert proxycert.get_subject().as_text() == 'C=UK, CN=OpenSSL Group, CN=Proxy', proxycert.get_subject().as_text()
        assert proxycert.get_subject().as_text(indent=2, flags=m2.XN_FLAG_RFC2253) == '  CN=Proxy,CN=OpenSSL Group,C=UK', '"%s"' %  proxycert.get_subject().as_text(indent=2, flags=m2.XN_FLAG_RFC2253)

    def make_eecert(self, cacert):
        eecert = X509.X509()
        eecert.set_serial_number(2)
        eecert.set_version(2)
        t = long(time.time()) + time.timezone
        now = ASN1.ASN1_UTCTIME()
        now.set_time(t)
        now_plus_year = ASN1.ASN1_UTCTIME()
        now_plus_year.set_time(t + 60 * 60 * 24 * 365)
        eecert.set_not_before(now)
        eecert.set_not_after(now_plus_year)
        eecert.set_issuer(cacert.get_subject())
        return eecert
    
    def make_proxycert(self, eecert):
        proxycert = X509.X509()
        pk2 = EVP.PKey()
        proxykey =  RSA.gen_key(1024, 65537, self.callback)
        pk2.assign_rsa(proxykey)
        proxycert.set_pubkey(pk2)
        proxycert.set_version(2)
        not_before = ASN1.ASN1_UTCTIME()
        not_after = ASN1.ASN1_UTCTIME()
        not_before.set_time(int(time.time()))
        offset = 12 * 3600
        not_after.set_time(int(time.time()) + offset )
        proxycert.set_not_before(not_before)
        proxycert.set_not_after(not_after)
        proxycert.set_issuer_name(eecert.get_subject())
        proxycert.set_serial_number(12345678)
        proxy_subject_name = X509.X509_Name()
        issuer_name_string = eecert.get_subject().as_text()
        seq = issuer_name_string.split(",")

        subject_name = X509.X509_Name()
        for entry in seq:
            l = entry.split("=")
            subject_name.add_entry_by_txt(field=l[0].strip(),
                                          type=ASN1.MBSTRING_ASC,
                                          entry=l[1], len=-1, loc=-1, set=0)

        subject_name.add_entry_by_txt(field="CN", type=ASN1.MBSTRING_ASC,
                                      entry="Proxy", len=-1, loc=-1, set=0)


        proxycert.set_subject_name(subject_name)
        pci_ext = X509.new_extension("proxyCertInfo", 
                                     "critical,language:Inherit all", 1) # XXX leaks 8 bytes 
        proxycert.add_ext(pci_ext)
        return proxycert
    
    def test_fingerprint(self):
        x509 = X509.load_cert('tests/x509.pem')
        fp = x509.get_fingerprint('sha1')
        expected = '8D2EB9E203B5FFDC7F4FA7DC4103E852A55B808D'
        assert fp == expected, '%s != %s' % (fp, expected)

    def test_load_der_string(self):
        f = open('tests/x509.der', 'rb')
        x509 = X509.load_cert_der_string(''.join(f.readlines()))
        fp = x509.get_fingerprint('sha1')
        expected = '8D2EB9E203B5FFDC7F4FA7DC4103E852A55B808D'
        assert fp == expected, '%s != %s' % (fp, expected)

    def test_save_der_string(self):
        x509 = X509.load_cert('tests/x509.pem')
        s = x509.as_der()
        f = open('tests/x509.der', 'rb')
        s2 = f.read()
        f.close()
        assert s == s2

    def test_load(self):
        x509 = X509.load_cert('tests/x509.pem')
        x5092 = X509.load_cert('tests/x509.der', format=X509.FORMAT_DER)
        assert x509.as_text() == x5092.as_text()
        assert x509.as_pem() == x5092.as_pem()
        assert x509.as_der() == x5092.as_der()
        return
    
    def test_load_bio(self):
        bio = BIO.openfile('tests/x509.pem')
        bio2 = BIO.openfile('tests/x509.der')
        x509 = X509.load_cert_bio(bio)
        x5092 = X509.load_cert_bio(bio2, format=X509.FORMAT_DER)
        
        self.assertRaises(ValueError, X509.load_cert_bio, bio2, format=45678)

        assert x509.as_text() == x5092.as_text()
        assert x509.as_pem() == x5092.as_pem()
        assert x509.as_der() == x5092.as_der()
        return

    def test_load_string(self):
        f = open('tests/x509.pem')
        s = f.read()
        f.close()
        f2 = open('tests/x509.der', 'rb')
        s2 = f2.read()
        f2.close()
        x509 = X509.load_cert_string(s)
        x5092 = X509.load_cert_string(s2, X509.FORMAT_DER)
        assert x509.as_text() == x5092.as_text()
        assert x509.as_pem() == x5092.as_pem()
        assert x509.as_der() == x5092.as_der()
        return
    
    def test_load_request_bio(self):
        (req, _) = self.mkreq(1024)

        r1 = X509.load_request_der_string(req.as_der())
        r2 = X509.load_request_string(req.as_der(), X509.FORMAT_DER)
        r3 = X509.load_request_string(req.as_pem(), X509.FORMAT_PEM)

        r4 = X509.load_request_bio(BIO.MemoryBuffer(req.as_der()), X509.FORMAT_DER)
        r5 = X509.load_request_bio(BIO.MemoryBuffer(req.as_pem()), X509.FORMAT_PEM)

        for r in [r1, r2, r3, r4, r5]:
            assert req.as_der() == r.as_der()

        self.assertRaises(ValueError, X509.load_request_bio, BIO.MemoryBuffer(req.as_pem()), 345678)

    def test_save(self):
        x509 = X509.load_cert('tests/x509.pem')
        f = open('tests/x509.pem', 'r')
        lTmp = f.readlines()
        x509_pem = ''.join(lTmp[44:60]) # -----BEGIN CERTIFICATE----- : -----END CERTIFICATE-----
        f.close()
        f = open('tests/x509.der', 'rb')
        x509_der = f.read()
        f.close()
        x509.save('tests/tmpcert.pem')
        f = open('tests/tmpcert.pem')
        s = f.read()
        f.close()
        self.assertEquals(s, x509_pem)
        os.remove('tests/tmpcert.pem')
        x509.save('tests/tmpcert.der', format=X509.FORMAT_DER)
        f = open('tests/tmpcert.der', 'rb')
        s = f.read()
        f.close()
        self.assertEquals(s, x509_der)
        os.remove('tests/tmpcert.der')

    def test_malformed_data(self):
        self.assertRaises(X509.X509Error, X509.load_cert_string, 'Hello')
        self.assertRaises(X509.X509Error, X509.load_cert_der_string, 'Hello')
        self.assertRaises(X509.X509Error, X509.new_stack_from_der, 'Hello')
        self.assertRaises(X509.X509Error, X509.load_cert, 'tests/alltests.py')
        self.assertRaises(X509.X509Error, X509.load_request, 'tests/alltests.py')
        self.assertRaises(X509.X509Error, X509.load_request_string, 'Hello')
        self.assertRaises(X509.X509Error, X509.load_request_der_string, 'Hello')
        self.assertRaises(X509.X509Error, X509.load_crl, 'tests/alltests.py')
        
    def test_long_serial(self):
        from M2Crypto import X509
        cert = X509.load_cert('tests/long_serial_cert.pem')
        self.assertEquals(cert.get_serial_number(), 17616841808974579194)

        cert = X509.load_cert('tests/thawte.pem')
        self.assertEquals(cert.get_serial_number(), 127614157056681299805556476275995414779)


class X509_StackTestCase(unittest.TestCase):
    
    def test_make_stack_from_der(self):
        f = open("tests/der_encoded_seq.b64")
        b64 = f.read(1304)
        seq = base64.decodestring(b64)
        stack = X509.new_stack_from_der(seq)
        cert = stack.pop()
        assert stack.pop() is None
        
        cert.foobar = 1
        assert cert.foobar == 1
        
        subject = cert.get_subject() 
        assert str(subject) == "/DC=org/DC=doegrids/OU=Services/CN=host/bosshog.lbl.gov"

    def test_make_stack_check_num(self):
        f = open("tests/der_encoded_seq.b64")
        b64 = f.read(1304)
        seq = base64.decodestring(b64)
        stack = X509.new_stack_from_der(seq)
        num = len(stack)
        assert num == 1 
        cert = stack.pop() 
        num = len(stack)
        assert num == 0 
        subject = cert.get_subject() 
        assert str(subject) == "/DC=org/DC=doegrids/OU=Services/CN=host/bosshog.lbl.gov"

    def test_make_stack(self):
        stack = X509.X509_Stack()
        cert = X509.load_cert("tests/x509.pem")
        issuer = X509.load_cert("tests/ca.pem")
        cert_subject1 = cert.get_subject()
        issuer_subject1 = issuer.get_subject()
        stack.push(cert)
        stack.push(issuer)
        
        # Test stack iterator
        i = 0
        for c in stack:
            i += 1
            assert len(c.get_subject().CN) > 0
        assert i == 2
        
        issuer_pop = stack.pop() 
        cert_pop = stack.pop() 
        cert_subject2 = cert_pop.get_subject() 
        issuer_subject2 = issuer.get_subject()
        assert str(cert_subject1) == str(cert_subject2)
        assert str(issuer_subject1) == str(issuer_subject2)
    
    def test_as_der(self):
        stack = X509.X509_Stack()
        cert = X509.load_cert("tests/x509.pem")
        issuer = X509.load_cert("tests/ca.pem")
        cert_subject1 = cert.get_subject()
        issuer_subject1 = issuer.get_subject()
        stack.push(cert)
        stack.push(issuer)
        der_seq = stack.as_der() 
        stack2 = X509.new_stack_from_der(der_seq)
        issuer_pop = stack2.pop() 
        cert_pop = stack2.pop() 
        cert_subject2 = cert_pop.get_subject() 
        issuer_subject2 = issuer.get_subject()
        assert str(cert_subject1) == str(cert_subject2)
        assert str(issuer_subject1) == str(issuer_subject2)
        

class X509_ExtTestCase(unittest.TestCase):
    
    def test_ext(self):
        if 0: # XXX
            # With this leaks 8 bytes:
            name = "proxyCertInfo"
            value = "critical,language:Inherit all"
        else:
            # With this there are no leaks:
            name = "nsComment"
            value = "Hello"
        
        lhash = m2.x509v3_lhash()
        ctx = m2.x509v3_set_conf_lhash(lhash)
        x509_ext_ptr = m2.x509v3_ext_conf(lhash, ctx, name, value)
        x509_ext = X509.X509_Extension(x509_ext_ptr, 1)


class CRLTestCase(unittest.TestCase):
    def test_new(self):
        crl = X509.CRL()
        self.assertEqual(crl.as_text()[:34],
                         'Certificate Revocation List (CRL):')
    

def suite():
    suite = unittest.TestSuite()
    suite.addTest(unittest.makeSuite(X509TestCase))
    suite.addTest(unittest.makeSuite(X509_StackTestCase))
    suite.addTest(unittest.makeSuite(X509_ExtTestCase))
    suite.addTest(unittest.makeSuite(CRLTestCase))
    return suite


if __name__ == '__main__':
    Rand.load_file('randpool.dat', -1)
    unittest.TextTestRunner().run(suite())
    Rand.save_file('randpool.dat')