saml2: remove authentic2 dependency and integrate saml2utils into

mandaye
This commit is contained in:
Jérôme Schneider 2013-05-22 16:45:44 +02:00
parent 53ab2c5b82
commit 0ea966dadb
7 changed files with 652 additions and 32 deletions

View File

@ -38,24 +38,12 @@ Installation
Dependencies
------------
You must install the following packages to use Mandaye
* Python >= 2.5:: http://python.org/
* Setuptools >= 0.6:: http://pypi.python.org/pypi/setuptools
* Gunicorn >= 0.13:: http://pypi.python.org/pypi/gunicorn
* Poster >= 0.8:: http://pypi.python.org/pypi/poster/
* SQLAlchemy >= 0.7:: http://pypi.python.org/pypi/SQLAlchemy
* Beaker >= 1.6:: http://pypi.python.org/pypi/Beaker
* Mako >= 0.4:: http://pypi.python.org/pypi/Mako
* lxml >= 2.3:: http://pypi.python.org/pypi/lxml
* xtraceback >= 0.3:: http://pypi.python.org/pypi/xtraceback
* sqlalchemy-migrate:: http://pypi.python.org/pypi/sqlalchemy-migrate
The dependencies are in define into requirements.txt
Quick installation
------------------
Install at least Python >=2.5 and pip in your system.
Install at least Python >=2.6 and pip in your system.
For example with Debian or a Debian based distribution::
sudo apt-get install python python-pip
@ -87,29 +75,40 @@ Quick Start
First step is to create a mandaye project::
$ mandaye_admin.py --newproject
$ mandaye --newproject PROJECT_NAME
$ cd PROJECT_NAME
Configure your project (look configuration section)::
$ touch PROJECT_NAME/local_config.py
$ vi PROJECT_NAME/local_config.py
Install your Mandaye project::
$ python setup.py install
$ PROJECT_NAME_manager --createdb
Launch mandaye server::
$ mandaye_server.py
$ PROJECT_NAME_server
mandaye_server.py use gunicorn and gunicorn options (please read http://gunicorn.org/configure.html)
You could also use gunicorn.conf.py-sample (in the mandaye files)::
$ mandaye_server.py -c PATH_TO_THE_FILE/gunicorn.conf.py
$ PROJECT_NAME_server -c PATH_TO_THE_FILE/gunicorn.conf.py
or::
$ mandaye_server.py -c PATH_TO_THE_FILE/gunicorn.conf.py -b 0.0.0.0:4242
$ PROJECT_NAME_server -c PATH_TO_THE_FILE/gunicorn.conf.py -b 0.0.0.0:4242
Upgrade Mandaye
---------------
You need to upgrade the update the repository and upgrade database::
~/mandaye $ git pull
~/mandaye $ ./mandaye_migrate.py upgrade
~/new_mandaye $ python setup.py install
~/new_mandaye $ alembic upgrade head
Configuration
=============

View File

@ -5,17 +5,16 @@ import urllib2
import lasso
from authentic2.saml import saml2utils
from urlparse import parse_qs
from mandaye import config, utils
from mandaye.saml import saml2utils
from mandaye.auth.authform import AuthForm
from mandaye.response import _302, _500
from mandaye.log import logger
from mandaye.template import serve_template
from mandaye.http import HTTPResponse, HTTPHeader
class SAML2Auth(AuthForm):
""" SAML 2 authentification
"""

0
mandaye/saml/__init__.py Normal file
View File

429
mandaye/saml/saml2utils.py Normal file
View File

@ -0,0 +1,429 @@
import xml.etree.ElementTree as etree
import lasso
import x509utils
import base64
import binascii
import re
import datetime
import time
def filter_attribute_private_key(message):
return re.sub(r' (\w+:)?(PrivateKey=")([&#;\w/ +-=])+(")', '', message)
def filter_element_private_key(message):
return re.sub(r'(<saml)(p)?(:PrivateKeyFile>-----BEGIN RSA PRIVATE KEY-----)'
'([&#;\w/+=\s])+'
'(-----END RSA PRIVATE KEY-----</saml)(p)?(:PrivateKeyFile>)',
'', message)
def bool2xs(boolean):
'''Convert a boolean value to XSchema boolean representation'''
if boolean is True:
return 'true'
if boolean is False:
return 'false'
raise TypeError()
def int_to_b64(i):
h = hex(i)[2:].strip('L')
if len(h) % 2 == 1:
h = '0' + h
return base64.b64encode(binascii.unhexlify(h))
def keyinfo(tb, key):
tb.pushNamespace(lasso.DS_HREF)
tb.start('KeyInfo', {})
if 'CERTIF' in key:
naked = x509utils.decapsulate_pem_file(key)
tb.start('X509Data', {})
tb.start('X509Certificate', {})
tb.data(naked)
tb.end('X509Certificate')
tb.end('X509Data')
else:
tb.start('KeyValue', {})
tb.start('RSAKeyValue', {})
tb.start('Modulus', {})
tb.data(int_to_b64(x509utils.get_rsa_public_key_modulus(key)))
tb.end('Modulus')
tb.start('Exponent', {})
tb.data(int_to_b64(x509utils.get_rsa_public_key_exponent(key)))
tb.end('Exponent')
tb.end('RSAKeyValue')
tb.end('KeyValue')
tb.end('KeyInfo')
tb.popNamespace()
class NamespacedTreeBuilder(etree.TreeBuilder):
def __init__(self, *args, **kwargs):
self.__old_ns = []
self.__ns = None
self.__opened = []
return etree.TreeBuilder.__init__(self, *args, **kwargs)
def pushNamespace(self, ns):
self.__old_ns.append(self.__ns)
self.__ns = ns
def popNamespace(self):
self.__ns = self.__old_ns.pop()
def start(self, tag, attrib):
tag = '{%s}%s' % (self.__ns, tag)
self.__opened.append(tag)
return etree.TreeBuilder.start(self, tag, attrib)
def simple_content(self, tag, data):
self.start(tag, {})
self.data(data)
self.end()
def end(self, tag = None):
if tag:
self.__opened.pop()
tag = '{%s}%s' % (self.__ns, tag)
else:
tag = self.__opened.pop()
return etree.TreeBuilder.end(self, tag)
class Saml2Metadata(object):
ENTITY_DESCRIPTOR = 'EntityDescriptor'
SP_SSO_DESCRIPTOR = 'SPSSODescriptor'
IDP_SSO_DESCRIPTOR = 'IDPSSODescriptor'
ARTIFACT_RESOLUTION_SERVICE = 'ArtifactResolutionService'
SINGLE_LOGOUT_SERVICE = 'SingleLogoutService'
MANAGE_NAME_ID_SERVICE = 'ManageNameIDService'
SINGLE_SIGN_ON_SERVICE = 'SingleSignOnService'
NAME_ID_MAPPING_SERVICE = 'NameIDMappingService'
ASSERTION_ID_REQUEST_SERVICE = 'AssertionIDRequestService'
ASSERTION_CONSUMER_SERVICE = 'AssertionConsumerService'
PROTOCOL_SUPPORT_ENUMERATION = 'protocolSupportEnumeration'
KEY_DESCRIPTOR = 'KeyDescriptor'
EXTENSIONS = 'Extensions'
DISCOVERY_RESPONSE = 'DiscoveryResponse'
DISCOVERY_NS = 'urn:oasis:names:tc:SAML:profiles:SSO:idp-discovery-protocol'
DISCOVERY_BINDING = 'urn:oasis:names:tc:SAML:profiles:SSO:idp-discovery-protocol'
sso_services = ( ARTIFACT_RESOLUTION_SERVICE, SINGLE_LOGOUT_SERVICE,
MANAGE_NAME_ID_SERVICE )
idp_services = ( SINGLE_SIGN_ON_SERVICE, NAME_ID_MAPPING_SERVICE,
ASSERTION_ID_REQUEST_SERVICE )
sp_services = ( ASSERTION_CONSUMER_SERVICE, )
indexed_endpoints = ( ARTIFACT_RESOLUTION_SERVICE,
ASSERTION_CONSUMER_SERVICE )
def __init__(self, entity_id, url_prefix = '', valid_until = None,
cache_duration = None):
'''Initialize a new generator for a metadata file.
Entity id is the name of the provider
'''
self.entity_id = entity_id
self.url_prefix = url_prefix
self.role_descriptors = {}
self.valid_until = valid_until
self.cache_duration = cache_duration
self.tb = NamespacedTreeBuilder()
self.tb.pushNamespace(lasso.SAML2_METADATA_HREF)
def add_role_descriptor(self, role, map, options):
'''Add a role descriptor, map is a sequence of tuples formatted as
(endpoint_type, (bindings, ..) , url [, return_url])
endpoint_type is a string among:
- SingleSignOnService
- AssertionConsumer
- SingleLogoutService
- ManageNameIDService
- AuthzService
- AuthnQueryService
- AttributeService
- AssertionIDRequestService'''
self.role_descriptors[role] = (map, options)
def add_sp_descriptor(self, map, options):
for row in map:
if row[0] not in self.sp_services + self.sso_services:
raise TypeError()
self.add_role_descriptor('sp', map, options)
def add_idp_descriptor(self, map, options):
for row in map:
if row[0] not in self.idp_services + self.sso_services:
raise TypeError()
self.add_role_descriptor('idp', map, options)
def generate_services(self, map, options, listing):
if options:
if 'NameIDFormat' in options:
for name_id_format in options['NameIDFormat']:
self.tb.start('NameIDFormat', {})
self.tb.data(name_id_format)
self.tb.end('NameIDFormat')
if 'signing_key' in options:
self.add_keyinfo(options['signing_key'], 'signing')
if 'encryption_key' in options:
self.add_keyinfo(options['encryption_key'], 'encryption')
if 'key' in options:
self.add_keyinfo(options['key'], None)
if 'disco' in options:
self.add_disco_extension(options['disco'])
assertion_consumer_idx = 1
for service in listing:
selected = [ row for row in map if row[0] == service ]
for row in selected:
if isinstance(row[1], str):
bindings = [ row[1] ]
else:
bindings = row[1]
for binding in bindings:
attribs = { 'Binding' : binding,
'Location': self.url_prefix + row[2] }
if len(row) == 4:
attribs['ResponseLocation'] = self.url_prefix + row[3]
if service in self.indexed_endpoints:
if len(row) == 5:
if row[4] is True:
attribs['isDefault'] = 'true'
if row[4] is False:
attribs['isDefault'] = 'false'
attribs['index'] = str(assertion_consumer_idx)
assertion_consumer_idx += 1
self.tb.start(service, attribs)
self.tb.end(service)
def add_keyinfo(self, key, use):
attrib = {}
if use:
attrib = { 'use': use }
self.tb.start(self.KEY_DESCRIPTOR, attrib)
keyinfo(self.tb, key)
self.tb.end(self.KEY_DESCRIPTOR)
def root_element(self):
attrib = { 'entityID' : self.entity_id}
if self.cache_duration:
attrib['cacheDuration'] = self.cache_duration
if self.valid_until:
attrib['validUntil'] = self.valid_until
self.entity_descriptor = self.tb.start(self.ENTITY_DESCRIPTOR, attrib)
# Generate sso descriptor
attrib = { self.PROTOCOL_SUPPORT_ENUMERATION: lasso.SAML2_PROTOCOL_HREF }
if self.role_descriptors.get('sp'):
map, options = self.role_descriptors['sp']
self.sp_descriptor = self.tb.start(self.SP_SSO_DESCRIPTOR, attrib)
self.generate_services(map, options, self.sso_services)
self.generate_services(map, {}, self.sp_services)
self.tb.end(self.SP_SSO_DESCRIPTOR)
if self.role_descriptors.get('idp'):
map, options = self.role_descriptors['idp']
self.sp_descriptor = self.tb.start(self.IDP_SSO_DESCRIPTOR, attrib)
self.generate_services(map, options, self.sso_services)
self.generate_services(map, {}, self.idp_services)
self.tb.end(self.IDP_SSO_DESCRIPTOR)
self.tb.end(self.ENTITY_DESCRIPTOR)
return self.tb.close()
def add_disco_extension(self, disco_return_url):
self.tb.start(self.EXTENSIONS, {})
self.tb.pushNamespace(self.DISCOVERY_NS)
index = 1
for url in disco_return_url:
attrib = {'Binding': self.DISCOVERY_BINDING,
'Location': self.url_prefix + url,
'index': str(index)}
self.tb.start(self.DISCOVERY_RESPONSE, attrib)
self.tb.end(self.DISCOVERY_RESPONSE)
index += 1
self.tb.popNamespace()
self.tb.end(self.EXTENSIONS)
def __str__(self):
return '<?xml version="1.0"?>\n' + etree.tostring(self.root_element())
def iso8601_to_datetime(date_string):
'''Convert a string formatted as an ISO8601 date into a time_t value.
This function ignores the sub-second resolution'''
m = re.match(r'(\d+-\d+-\d+T\d+:\d+:\d+)(?:\.\d+)?Z$', date_string)
if not m:
raise ValueError('Invalid ISO8601 date')
tm = time.strptime(m.group(1)+'Z', "%Y-%m-%dT%H:%M:%SZ")
return datetime.datetime.fromtimestamp(time.mktime(tm))
def authnresponse_checking(login, subject_confirmation, logger, saml_request_id=None):
logger.debug('authnresponse_checking: beginning...')
# If there is no inResponseTo: IDP initiated
# else, check that the response id is the same
assertion = login.assertion
if not assertion:
logger.error('authnresponse_checking: Assertion missing')
return False
logger.debug('authnresponse_checking: assertion %s' % assertion.dump())
irt = None
try:
irt = assertion.subject. \
subjectConfirmation.subjectConfirmationData.inResponseTo
except:
pass
logger.debug('authnresponse_checking: inResponseTo: %s' % irt)
if irt and (not saml_request_id or saml_request_id != irt):
logger.error('authnresponse_checking: Request and Response ID do not match')
return False
# Check: SubjectConfirmation
try:
if assertion.subject.subjectConfirmation.method != \
'urn:oasis:names:tc:SAML:2.0:cm:bearer':
logger.error('authnresponse_checking: Unknown \
SubjectConfirmation Method')
return False
except:
logger.error('authnresponse_checking: Error checking \
SubjectConfirmation Method')
return False
logger.debug('authnresponse_checking: subjectConfirmation method known')
# Check: Check that the url is the same as in the assertion
try:
if assertion.subject. \
subjectConfirmation.subjectConfirmationData.recipient != \
subject_confirmation:
logger.error('authnresponse_checking: SubjectConfirmation \
Recipient Mismatch, %s is not %s' % (assertion.subject. \
subjectConfirmation.subjectConfirmationData.recipient,
subject_confirmation))
return False
except:
logger.error('authnresponse_checking: Error checking \
SubjectConfirmation Recipient')
return False
logger.debug('authnresponse_checking: \
the url is the same as in the assertion')
# Check: AudienceRestriction
try:
audience_ok = False
for audience_restriction in assertion.conditions.audienceRestriction:
if audience_restriction.audience != login.server.providerId:
logger.error('authnresponse_checking: Incorrect AudienceRestriction')
return False
audience_ok = True
if not audience_ok:
logger.error('authnresponse_checking: Incorrect AudienceRestriction')
return False
except:
logger.error('authnresponse_checking: Error checking AudienceRestriction')
return False
logger.debug('authnresponse_checking: audience restriction respected')
# Check: notBefore, notOnOrAfter
now = datetime.datetime.utcnow()
try:
not_before = assertion.subject. \
subjectConfirmation.subjectConfirmationData.notBefore
except:
logger.error('authnresponse_checking: missing subjectConfirmationData')
return False
not_on_or_after = assertion.subject.subjectConfirmation. \
subjectConfirmationData.notOnOrAfter
if irt:
if not_before is not None:
logger.error('authnresponse_checking: assertion in response to an AuthnRequest, \
notBefore MUST not be present in SubjectConfirmationData')
return False
elif not_before is not None and not not_before.endswith('Z'):
logger.error('authnresponse_checking: invalid notBefore value ' + not_before)
return False
if not_on_or_after is None or not not_on_or_after.endswith('Z'):
logger.error('authnresponse_checking: invalid notOnOrAfter format')
return False
try:
if not_before and now < iso8601_to_datetime(not_before):
logger.error('authnresponse_checking: Assertion received too early')
return False
except:
logger.error('authnresponse_checking: invalid notBefore value ' + not_before)
return False
try:
if not_on_or_after and now > iso8601_to_datetime(not_on_or_after):
logger.error('authnresponse_checking: Assertion expired')
return False
except:
logger.error('authnresponse_checking: invalid notOnOrAfter value')
return False
logger.debug('authnresponse_checking: assertion validity timeslice respected \
%s <= %s < %s ' % (not_before, str(now), not_on_or_after))
return True
def get_attributes_from_assertion(assertion, logger):
attributes = dict()
if not assertion:
return attributes
for att_statement in assertion.attributeStatement:
for attribute in att_statement.attribute:
name = None
format = lasso.SAML2_ATTRIBUTE_NAME_FORMAT_BASIC
nickname = None
try:
name = attribute.name.decode('ascii')
except:
logger.warning('get_attributes_from_assertion: error decoding name of \
attribute %s' % attribute.dump())
else:
try:
if attribute.nameFormat:
format = attribute.nameFormat.decode('ascii')
if attribute.friendlyName:
nickname = attribute.friendlyName
except Exception, e:
message = 'get_attributes_from_assertion: name or format of an \
attribute failed to decode as ascii: %s due to %s'
logger.warning(message % (attribute.dump(), str(e)))
try:
values = attribute.attributeValue
if values:
attributes[(name, format)] = []
if nickname:
attributes[nickname] = attributes[(name, format)]
for value in values:
content = [any.exportToXml() for any in value.any]
content = ''.join(content)
attributes[(name, format)].append(content.\
decode('utf8'))
except Exception, e:
message = 'get_attributes_from_assertion: value of an \
attribute failed to decode as ascii: %s due to %s'
logger.warning(message % (attribute.dump(), str(e)))
attributes['__issuer'] = assertion.issuer.content
attributes['__nameid'] = assertion.subject.nameID.content
return attributes
if __name__ == '__main__':
pkey, _ = x509utils.generate_rsa_keypair()
meta = Saml2Metadata('http://example.com/saml', 'http://example.com/saml/prefix/')
bindings2 = [ lasso.SAML2_METADATA_BINDING_SOAP,
lasso.SAML2_METADATA_BINDING_REDIRECT,
lasso.SAML2_METADATA_BINDING_POST ]
options = { 'signing_key': pkey }
meta.add_sp_descriptor((
('SingleLogoutService',
lasso.SAML2_METADATA_BINDING_SOAP, 'logout', 'logoutReturn' ),
('ManageNameIDService',
bindings2, 'manageNameID', 'manageNameIDReturn' ),
('AssertionConsumerService',
[ lasso.SAML2_METADATA_BINDING_POST ], 'acs'),),
options)
root = meta.root_element()
print etree.tostring(root)

203
mandaye/saml/x509utils.py Normal file
View File

@ -0,0 +1,203 @@
import base64
import binascii
import tempfile
import os
import subprocess
import stat
_openssl = 'openssl'
def decapsulate_pem_file(file_or_string):
'''Remove PEM header lines'''
if not isinstance(file_or_string, basestring):
content = file_or_string.read()
else:
content = file_or_string
i = content.find('--BEGIN')
j = content.find('\n', i)
k = content.find('--END', j)
l = content.rfind('\n', 0, k)
return content[j+1:l]
def _call_openssl(args):
'''Use subprocees to spawn an openssl process
Return a tuple made of the return code and the stdout output
'''
try:
process = subprocess.Popen(args=[_openssl]+args,stdout=subprocess.PIPE,stderr=subprocess.STDOUT)
output = process.communicate()[0]
return process.returncode, output
except OSError:
return 1, None
def _protect_file(fd,filepath):
'''Make a file targeted by a file descriptor readable only by the current user
It's needed to be sure nobody can read the private key file we manage.
'''
if hasattr(os, 'fchmod'):
os.fchmod(fd, stat.S_IRUSR | stat.S_IWUSR)
else: # handle python <2.6
os.chmod(filepath, stat.S_IRUSR | stat.S_IWUSR)
def check_key_pair_consistency(publickey=None,privatekey=None):
'''Check if two PEM key pair whether they are publickey or certificate, are
well formed and related.
'''
if publickey and privatekey:
try:
privatekey_file_fd, privatekey_fn = tempfile.mkstemp()
publickey_file_fd, publickey_fn = tempfile.mkstemp()
_protect_file(privatekey_file_fd, privatekey_fn)
_protect_file(publickey_file_fd, publickey_fn)
os.fdopen(privatekey_file_fd,'w').write(privatekey)
os.fdopen(publickey_file_fd,'w').write(publickey)
if 'BEGIN CERTIFICATE' in publickey:
rc1, modulus1 = _call_openssl(['x509', '-in', publickey_fn,'-noout','-modulus'])
else:
rc1, modulus1 = _call_openssl(['rsa', '-pubin', '-in', publickey_fn,'-noout','-modulus'])
if rc1 != 0:
rc1, modulus1 = _call_openssl(['dsa', '-pubin', '-in', publickey_fn,'-noout','-modulus'])
if rc1 != 0:
return False
rc2, modulus2 = _call_openssl(['rsa', '-in', privatekey_fn,'-noout','-modulus'])
if rc2 != 0:
rc2, modulus2 = _call_openssl(['dsa', '-in', privatekey_fn,'-noout','-modulus'])
if rc1 == 0 and rc2 == 0 and modulus1 == modulus2:
return True
else:
return False
finally:
os.unlink(privatekey_fn)
os.unlink(publickey_fn)
return None
def generate_rsa_keypair(numbits=1024):
'''Generate simple RSA public and private key files
'''
try:
privatekey_file_fd, privatekey_fn = tempfile.mkstemp()
publickey_file_fd, publickey_fn = tempfile.mkstemp()
_protect_file(privatekey_file_fd, privatekey_fn)
_protect_file(publickey_file_fd, publickey_fn)
rc1, _ = _call_openssl(['genrsa','-out', privatekey_fn,'-passout', 'pass:',str(numbits)])
rc2, _ = _call_openssl(['rsa','-in', privatekey_fn,'-pubout','-out', publickey_fn])
if rc1 != 0 or rc2 != 0:
raise Exception('Failed to generate a key')
return (os.fdopen(publickey_file_fd).read(), os.fdopen(privatekey_file_fd).read())
finally:
os.unlink(privatekey_fn)
os.unlink(publickey_fn)
def get_rsa_public_key_modulus(publickey):
try:
publickey_file_fd, publickey_fn = tempfile.mkstemp()
os.fdopen(publickey_file_fd,'w').write(publickey)
if 'BEGIN PUBLIC' in publickey:
rc, modulus = _call_openssl(['rsa', '-pubin', '-in', publickey_fn,'-noout','-modulus'])
elif 'BEGIN RSA PRIVATE KEY' in publickey:
rc, modulus = _call_openssl(['rsa', '-in', publickey_fn, '-noout', '-modulus'])
elif 'BEGIN CERTIFICATE' in publickey:
rc, modulus = _call_openssl(['x509', '-in', publickey_fn,'-noout','-modulus'])
else:
return None
i = modulus.find('=')
if rc == 0 and i:
return int(modulus[i+1:].strip(),16)
finally:
os.unlink(publickey_fn)
return None
def get_rsa_public_key_exponent(publickey):
try:
publickey_file_fd, publickey_fn = tempfile.mkstemp()
os.fdopen(publickey_file_fd,'w').write(publickey)
_exponent = 'Exponent: '
if 'BEGIN PUBLIC' in publickey:
rc, modulus = _call_openssl(['rsa', '-pubin', '-in', publickey_fn,'-noout','-text'])
elif 'BEGIN RSA PRIVATE' in publickey:
rc, modulus = _call_openssl(['rsa', '-in', publickey_fn, '-noout', '-text'])
_exponent = 'publicExponent: '
elif 'BEGIN CERTIFICATE' in publickey:
rc, modulus = _call_openssl(['x509', '-in', publickey_fn,'-noout','-text'])
else:
return None
i = modulus.find(_exponent)
j = modulus.find('(', i)
if rc == 0 and i and j:
return int(modulus[i+len(_exponent):j].strip())
finally:
os.unlink(publickey_fn)
return None
def can_generate_rsa_key_pair():
syspath = os.environ.get('PATH')
if syspath:
for base in syspath.split(':'):
if os.path.exists(os.path.join(base,'openssl')):
return True
else:
return False
def get_xmldsig_rsa_key_value(publickey):
def int_to_bin(i):
h = hex(i)[2:].strip('L')
if len(h) % 2 == 1:
h = '0' + h
return binascii.unhexlify(h)
mod = get_rsa_public_key_modulus(publickey)
exp = get_rsa_public_key_exponent(publickey)
return '<RSAKeyValue xmlns="http://www.w3.org/2000/09/xmldsig#">\n\t<Modulus>%s</Modulus>\n\t<Exponent>%s</Exponent>\n</RSAKeyValue>' % (base64.b64encode(int_to_bin(mod)), base64.b64encode(int_to_bin(exp)))
if __name__ == '__main__':
assert(can_generate_rsa_key_pair())
publickey, privatekey = generate_rsa_keypair()
assert(publickey is not None and privatekey is not None)
assert(check_key_pair_consistency(publickey, privatekey))
_, privatekey = generate_rsa_keypair()
assert(not check_key_pair_consistency(publickey, privatekey))
assert(get_xmldsig_rsa_key_value(publickey) is not None)
assert(get_rsa_public_key_modulus(publickey) is not None)
assert(get_rsa_public_key_exponent(publickey) is not None)
# Certificate/key generated using
# openssl req -x509 -newkey rsa:1024 -keyout key.pem -out req.pem
cert = '''-----BEGIN CERTIFICATE-----
MIICHjCCAYegAwIBAgIJALgmNSS3spUaMA0GCSqGSIb3DQEBBQUAMBUxEzARBgNV
BAoTCkVudHJvdXZlcnQwHhcNMDkxMDI4MjIwODEzWhcNMDkxMTI3MjIwODEzWjAV
MRMwEQYDVQQKEwpFbnRyb3V2ZXJ0MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKB
gQCtTbDTe/LrD+gvK0Sgf/rnvAg4zcc/vJcEdsiGsJ3shTse7OPf5fIaD7lry+jm
tFX61n8Rn1d1iw+whuYbrG6R3OhDw50vufb2RrRSHBOA7CcfiKQD6CT2p31msv+C
iHbGmoHRFyt2CnRGy2FCX2Oizf5qxfjHaJEXu0tk/SdN2QIDAQABo3YwdDAdBgNV
HQ4EFgQUlDrrh8KudeyeInXqios+Rdf9tQAwRQYDVR0jBD4wPIAUlDrrh8Kudeye
InXqios+Rdf9tQChGaQXMBUxEzARBgNVBAoTCkVudHJvdXZlcnSCCQC4JjUkt7KV
GjAMBgNVHRMEBTADAQH/MA0GCSqGSIb3DQEBBQUAA4GBAFHXBDW13NIiafS2cRP1
/KAMIfnB/kYINTUU7iv2oIOYtfpVR9yMmnLIVxTyN3rCWb7UV/ICkMotTHmKLDT8
Rp7tKc0zTQ+CQGFVYvfRAlz4kgW14DDx/oIBqr/yDv5mInFb8reSfP85cPrXp/wR
ufewZ2WHikP2kWoHWDkw8MDd
-----END CERTIFICATE-----'''
key = '''-----BEGIN RSA PRIVATE KEY-----
MIICXgIBAAKBgQCtTbDTe/LrD+gvK0Sgf/rnvAg4zcc/vJcEdsiGsJ3shTse7OPf
5fIaD7lry+jmtFX61n8Rn1d1iw+whuYbrG6R3OhDw50vufb2RrRSHBOA7CcfiKQD
6CT2p31msv+CiHbGmoHRFyt2CnRGy2FCX2Oizf5qxfjHaJEXu0tk/SdN2QIDAQAB
AoGBAKlFVQ17540JAHPyAxnxZxSpaC5zb8YlYiwOCVblc5rtlw1hvEGYy5wA987+
YAHW6pQSphKEXFyG81Asst0c0vExgGVFjzAy/GFrBTnl0l5PtwPDDIAmGP6DQw4C
lOHJePloKp0xjCo2nJ8XluxkPp1+XtJyJOhZWpQPDvF3uL+xAkEA3t58jg0SV55s
E10R04QOJB0qIB9U4Nw29uhh5RXv8JRq41pw4iDmpi9I67nGqDeuxlDUQ/+5rLOE
Ptp07BsFWwJBAMcQ7wiwhIYtRC8ff3WbWX9wcABDyX47uYvAMIiaEOmFmJyI41mW
xlik821Aaid1Z45vgBN32hYkEbpWaaIVe9sCQQCX7mpQ2F5ptskMhkTxwbN2MR+X
mGRfiiA6P/8EkejpQ/R+GxibPzydi9yVPidMY/FUpqOd24YzUonT408T6fPDAkEA
pkkt86tIOLEtaNO97CcF/t+Un5QAh9MqLmQv5pwUDo4Lqo7qo1bAfyHjOlr5kdaP
17qqWRjf82jT6jzu5nddywJAVQpxlZ8fIZUzTD2mRQeLf5O+rXmtH1LlwRRGCNaa
8eM47A92x9uplD/sN550pTKM7XLhHBvEfLujUoGHpWQxGA==
-----END RSA PRIVATE KEY-----'''
assert(check_key_pair_consistency(cert, key))
assert(get_xmldsig_rsa_key_value(cert))
assert(len(decapsulate_pem_file(key).splitlines()) == len(key.splitlines())-2)

View File

@ -1,10 +0,0 @@
#!/usr/bin/env python
import os
import mandaye.migration
from mandaye import config
if config.db_url:
from migrate.versioning.shell import main
main(url=config.db_url, debug='False', repository=mandaye.migration.__path__[0])