106 lines
3.2 KiB
Python
106 lines
3.2 KiB
Python
import re
|
|
import os
|
|
|
|
import lasso
|
|
|
|
from quixote import get_publisher, get_request
|
|
|
|
from qommon.misc import get_abs_path
|
|
|
|
from hosts import Host
|
|
|
|
def get_root_url():
|
|
req = get_request()
|
|
return '%s://%s%s' % (req.get_scheme(), req.get_server(), req.environ['SCRIPT_NAME'])
|
|
|
|
def get_proxied_site_path():
|
|
host = Host.get_host_from_url()
|
|
if host is None:
|
|
return None
|
|
return host.site_dir
|
|
|
|
def get_proxied_site_domain():
|
|
return get_request().get_server().split(':')[0]
|
|
|
|
def get_identity_provider_config():
|
|
get_publisher().reload_cfg()
|
|
idps_dir = get_abs_path('idp')
|
|
if get_publisher().cfg.has_key('idp'):
|
|
idp_dir = os.path.join(idps_dir, get_publisher().cfg['idp'])
|
|
|
|
metadata_path = os.path.join(idp_dir, 'metadata.xml')
|
|
|
|
public_key_path = os.path.join(idp_dir, 'public_key')
|
|
if not os.path.isfile(public_key_path):
|
|
public_key_path = None
|
|
|
|
ca_cert_chain_path = os.path.join(idp_dir, 'ca_cert_chain.pem')
|
|
if not os.path.isfile(ca_cert_chain_path):
|
|
ca_cert_chain_path = None
|
|
|
|
return metadata_path, public_key_path, ca_cert_chain_path
|
|
return None, None, None
|
|
|
|
def get_lasso_server(protocol='liberty'):
|
|
proxied_site_path = get_proxied_site_path()
|
|
if proxied_site_path is None:
|
|
return None
|
|
if protocol == 'liberty':
|
|
server = lasso.Server(
|
|
os.path.join(proxied_site_path, 'metadata.xml'),
|
|
os.path.join(proxied_site_path, 'private_key.pem'),
|
|
None, None)
|
|
elif protocol == 'saml2':
|
|
server = lasso.Server(
|
|
os.path.join(proxied_site_path, 'saml2_metadata.xml'),
|
|
os.path.join(proxied_site_path, 'private_key.pem'),
|
|
None, None)
|
|
else:
|
|
raise 'Unknown protocol'
|
|
|
|
metadata_path, public_key_path, ca_cert_chain_path = get_identity_provider_config()
|
|
if metadata_path:
|
|
try:
|
|
server.addProvider(
|
|
lasso.PROVIDER_ROLE_IDP,
|
|
metadata_path,
|
|
public_key_path,
|
|
ca_cert_chain_path)
|
|
except lasso.Error, error:
|
|
if error[0] == lasso.SERVER_ERROR_ADD_PROVIDER_PROTOCOL_MISMATCH:
|
|
return None
|
|
if error[0] == lasso.SERVER_ERROR_ADD_PROVIDER_FAILED:
|
|
return None
|
|
raise
|
|
|
|
return server
|
|
|
|
def get_provider_label(provider):
|
|
if not provider:
|
|
return None
|
|
if not hasattr(provider, str('getOrganization')):
|
|
return provider.providerId
|
|
|
|
organization = provider.getOrganization()
|
|
if not organization:
|
|
return provider.providerId
|
|
|
|
name = re.findall("<OrganizationDisplayName.*>(.*?)</OrganizationDisplayName>", organization)
|
|
if not name:
|
|
name = re.findall("<OrganizationName.*>(.*?)</OrganizationName>", organization)
|
|
if not name:
|
|
return provider.providerId
|
|
return name[0]
|
|
|
|
def get_current_protocol():
|
|
metadata_path, public_key_path, ca_cert_chain_path = get_identity_provider_config()
|
|
if not metadata_path:
|
|
return None
|
|
try:
|
|
provider = lasso.Provider(lasso.PROVIDER_ROLE_IDP, metadata_path, public_key_path, None)
|
|
except lasso.Error:
|
|
return None
|
|
else:
|
|
return provider.getProtocolConformance()
|
|
|