This repository has been archived on 2023-02-21. You can view files and clone it, but cannot push or open issues or pull requests.
mandayejs/mandayejs/mandaye/utils.py

154 lines
4.7 KiB
Python

# mandayejs - saml reverse proxy
# Copyright (C) 2015 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
import re
import json
import subprocess
import logging
import multiprocessing
import urlparse
from django.conf import settings
from django.shortcuts import resolve_url
from Cookie import SimpleCookie
logger = logging.getLogger(__name__)
def run(send_end, data, script):
phantom = subprocess.Popen([
settings.PHANTOM_JS_BINARY,
'--ignore-ssl-errors=yes', '--ssl-protocol=any',
os.path.join(settings.BASE_DIR, 'mandayejs', script)],
close_fds=True,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
stdout, stderr = phantom.communicate(json.dumps(data))
try:
output = re.search('<mandayejs>(.*?)</mandayejs>', stdout, re.DOTALL)
if not output:
raise ValueError
stdout = output.group(1)
result = json.loads(stdout)
except (ValueError,):
result = {"result": "json_error"}
logger.error("invalid json: %s (stderr: %s)", stdout, stderr)
if result.get('stderr'):
logger.warning(result['stderr'])
if result.get('error'):
logger.error('Error occured: %s' % result.get('reason'))
send_end.send(result)
send_end.close()
def exec_phantom(data, script='do_login.js'):
recv_end, send_end = multiprocessing.Pipe(False)
process = multiprocessing.Process(target=run, args=(send_end, data, script))
process.start()
if recv_end.poll(settings.PHANTOM_JS_TIMEOUT):
result = recv_end.recv()
recv_end.close()
else:
process.terminate()
send_end.close()
# Don't log locators, they may contain credentials (passwords)
context = {k: v for k, v in data.items() if k != 'locators'}
logger.error("PhantomJS process timeout, context: %s" % context)
result = {'result': 'timeout'}
return result
def cookie_builder(headers):
"""Build Cookies from list of headers
"""
cookie = SimpleCookie()
for header in headers:
cookie.load('; '.join(header.values()).encode('ascii'))
return cookie
def get_logout_info(request):
"""Returns phantomjs logout prerequis
"""
from mandayejs.applications import get_app_settings
app_settings = get_app_settings()
data = {}
data['logout_locator'] = getattr(app_settings, 'SITE_LOGOUT_LOCATOR')
data['address'] = request.build_absolute_uri(resolve_url('home'))
forced_logout_scheme = getattr(settings, 'PHANTOM_JS_LOGOUT_SCHEME')
if forced_logout_scheme:
url = urlparse.urlparse(data['address'])
url = url._replace(scheme=forced_logout_scheme)
data['address'] = url.geturl()
cookies = SimpleCookie(request.META.get('HTTP_COOKIE'))
domain = request.META.get('SERVER_NAME')
# Phantomjs Cookies Format
data['cookies'] = [{
'name': key,
'value': value.value,
'domain': domain,
'path': '/'
} for key, value in cookies.items()]
return data
def get_password_field():
"""Return name of the password field
"""
from mandayejs.applications import get_app_settings
app_settings = get_app_settings()
try:
field_name = [field.get('name') for field in app_settings.SITE_LOCATORS
if field.get('kind') == 'password']
return field_name[0]
except (IndexError,):
return None
def get_login_info(request, credentials):
"""Returns phantomjs login prerequis
"""
from mandayejs.applications import get_app_settings
app_settings = get_app_settings()
auth_checker = os.path.join(
settings.STATIC_ROOT, app_settings.SITE_AUTH_CHECKER)
return {
'address': request.build_absolute_uri(app_settings.SITE_LOGIN_PATH),
'cookies': [],
'locators': [credentials.to_login_info()],
'auth_checker': auth_checker,
'form_submit_element': app_settings.SITE_FORM_SUBMIT_ELEMENT
}
def get_idp():
"""Return idp
"""
return settings.MELLON_IDENTITY_PROVIDERS[0]['METADATA_URL']