This repository has been archived on 2023-02-21. You can view files and clone it, but cannot push or open issues or pull requests.
u-auth/uauth/organization/management/commands/sync_federations.py

117 lines
4.9 KiB
Python

import os
import json
import subprocess
import xml.etree.ElementTree as etree
import requests
import urlparse
import re
import lasso
from django.conf import settings
from django.core.management.base import BaseCommand, CommandError
from django.utils.text import slugify
def md_element_name(tag_name):
return '{%s}%s' % (lasso.SAML2_METADATA_HREF, tag_name)
def mdui_element_name(tag_name):
return '{%s}%s' % (SAML2_METADATA_UI_HREF, tag_name)
SAML2_METADATA_UI_HREF = 'urn:oasis:names:tc:SAML:metadata:ui'
ENTITY_DESCRIPTOR_TN = md_element_name('EntityDescriptor')
ENTITIES_DESCRIPTOR_TN = md_element_name('EntitiesDescriptor')
IDP_SSO_DESCRIPTOR_TN = md_element_name('IDPSSODescriptor')
SP_SSO_DESCRIPTOR_TN = md_element_name('SPSSODescriptor')
ORGANIZATION_DISPLAY_NAME = md_element_name('OrganizationDisplayName')
ORGANIZATION_NAME = md_element_name('OrganizationName')
ORGANIZATION = md_element_name('Organization')
EXTENSIONS = md_element_name('Extensions')
UI_INFO = mdui_element_name('UIInfo')
DISPLAY_NAME = mdui_element_name('DisplayName')
ENTITY_ID = 'entityID'
PROTOCOL_SUPPORT_ENUMERATION = 'protocolSupportEnumeration'
def check_support_saml2(tree):
if tree is not None and lasso.SAML2_PROTOCOL_HREF in tree.get(PROTOCOL_SUPPORT_ENUMERATION):
return True
return False
def verify_metadata(codename, signcert):
metadata = metadata_filename(codename, 'downloaded')
if not signcert:
print 'warn: do not verify %s metadata (no certificate provided)' % codename
ret = True
else:
signcert_pem = metadata_filename(codename, 'signcert.pem')
dir = os.path.join(METADATAS_DIR, codename)
f = open(signcert_pem, 'wb')
f.write(signcert)
f.close()
ret = 0 == subprocess.call(['xmlsec1', '--verify',
'--id-attr:ID', 'EntitiesDescriptor',
'--pubkey-cert-pem', signcert_pem,
'--enabled-key-data', 'key-name',
metadata])
if ret:
os.rename(metadata, metadata_filename(codename))
else:
print 'warn: bad signature for %s metadata' % codename
os.rename(metadata, metadata_filename(codename, 'bad_signature'))
return ret
class Command(BaseCommand):
def handle(self, *args, **options):
idps = []
if not os.path.exists(settings.METADATAS_DIR):
os.mkdir(settings.METADATAS_DIR)
for metadata_uri in settings.METADATA_URIS:
metadata = requests.get(metadata_uri)
url = urlparse.urlparse(metadata_uri)
metadata_file_path = os.path.join(settings.METADATAS_DIR,
url.path.split('/')[-1])
with open(metadata_file_path, 'w') as metadata_file:
metadata_file.write(metadata.content)
idp_dir, ext = os.path.splitext(metadata_file_path)
if not os.path.exists(idp_dir):
os.mkdir(os.path.join(settings.METADATAS_DIR, idp_dir))
doc = etree.parse(metadata_file_path)
if doc.getroot().tag == ENTITIES_DESCRIPTOR_TN:
entity_descriptors = doc.getroot().findall(ENTITY_DESCRIPTOR_TN)
for entity_descriptor in entity_descriptors:
idp = check_support_saml2(entity_descriptor.find(IDP_SSO_DESCRIPTOR_TN))
if not idp:
continue
entity_id = entity_descriptor.get(ENTITY_ID)
name = None
display_name = entity_descriptor.find('.//%s/%s/%s' % (EXTENSIONS, UI_INFO, DISPLAY_NAME))
if display_name is not None:
name = display_name.text
if not name:
organization = entity_descriptor.find(ORGANIZATION)
if organization is not None:
organization_display_name = organization.find(ORGANIZATION_DISPLAY_NAME)
organization_name = organization.find(ORGANIZATION_NAME)
if organization_display_name is not None:
name = organization_display_name.text
elif organization_name is not None:
name = organization_name.text
if not name:
name = entity_id
idp = entity_descriptor.find(IDP_SSO_DESCRIPTOR_TN)
entity_id = entity_descriptor.get(ENTITY_ID)
entities = etree.tostring(entity_descriptor)
metadata_dest_filename = os.path.join(idp_dir, '%s.xml' % slugify(name))
with open(metadata_dest_filename, 'w') as metadata:
metadata.write(entities)
idps.append({'METADATA': os.path.abspath(metadata_dest_filename),
'ENTITY_ID': entity_id,
'NAME': name})
with open(os.path.join(settings.METADATAS_DIR, 'idps.json'), 'w') as idps_file:
json.dump(idps, idps_file)