This repository has been archived on 2023-02-21. You can view files and clone it, but cannot push or open issues or pull requests.
larpe/larpe/tags/release-1.1.1/larpe/misc.py

106 lines
3.2 KiB
Python

import re
import os
import lasso
from quixote import get_publisher, get_request
from qommon.misc import get_abs_path
from hosts import Host
def get_root_url():
req = get_request()
return '%s://%s%s' % (req.get_scheme(), req.get_server(), req.environ['SCRIPT_NAME'])
def get_proxied_site_path():
host = Host.get_host_from_url()
if host is None:
return None
return host.site_dir
def get_proxied_site_domain():
return get_request().get_server().split(':')[0]
def get_identity_provider_config():
get_publisher().reload_cfg()
idps_dir = get_abs_path('idp')
if get_publisher().cfg.has_key('idp'):
idp_dir = os.path.join(idps_dir, get_publisher().cfg['idp'])
metadata_path = os.path.join(idp_dir, 'metadata.xml')
public_key_path = os.path.join(idp_dir, 'public_key')
if not os.path.isfile(public_key_path):
public_key_path = None
ca_cert_chain_path = os.path.join(idp_dir, 'ca_cert_chain.pem')
if not os.path.isfile(ca_cert_chain_path):
ca_cert_chain_path = None
return metadata_path, public_key_path, ca_cert_chain_path
return None, None, None
def get_lasso_server(protocol='liberty'):
proxied_site_path = get_proxied_site_path()
if proxied_site_path is None:
return None
if protocol == 'liberty':
server = lasso.Server(
os.path.join(proxied_site_path, 'metadata.xml'),
os.path.join(proxied_site_path, 'private_key.pem'),
None, None)
elif protocol == 'saml2':
server = lasso.Server(
os.path.join(proxied_site_path, 'saml2_metadata.xml'),
os.path.join(proxied_site_path, 'private_key.pem'),
None, None)
else:
raise 'Unknown protocol'
metadata_path, public_key_path, ca_cert_chain_path = get_identity_provider_config()
if metadata_path:
try:
server.addProvider(
lasso.PROVIDER_ROLE_IDP,
metadata_path,
public_key_path,
ca_cert_chain_path)
except lasso.Error, error:
if error[0] == lasso.SERVER_ERROR_ADD_PROVIDER_PROTOCOL_MISMATCH:
return None
if error[0] == lasso.SERVER_ERROR_ADD_PROVIDER_FAILED:
return None
raise
return server
def get_provider_label(provider):
if not provider:
return None
if not hasattr(provider, str('getOrganization')):
return provider.providerId
organization = provider.getOrganization()
if not organization:
return provider.providerId
name = re.findall("<OrganizationDisplayName.*>(.*?)</OrganizationDisplayName>", organization)
if not name:
name = re.findall("<OrganizationName.*>(.*?)</OrganizationName>", organization)
if not name:
return provider.providerId
return name[0]
def get_current_protocol():
metadata_path, public_key_path, ca_cert_chain_path = get_identity_provider_config()
if not metadata_path:
return None
try:
provider = lasso.Provider(lasso.PROVIDER_ROLE_IDP, metadata_path, public_key_path, None)
except lasso.Error:
return None
else:
return provider.getProtocolConformance()