lasso/python/tests/websimulator.py

322 lines
10 KiB
Python

# -*- coding: UTF-8 -*-
# Python Lasso Simulator
#
# Copyright (C) 2004 Entr'ouvert
# http://lasso.entrouvert.org
#
# Author: Emmanuel Raviart <eraviart@entrouvert.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
# FIXME: Replace principal with client in most methods.
# FIXME: Rename webUser to userAccount.
class HttpRequest(object):
client = None # Principal or web site sending the request.
body = None
form = None
headers = None
method = None # 'GET' or 'POST' or 'PUT' or...
url = None
def __init__(self, client, method, url, headers = None, body = None, form = None):
self.client = client
self.method = method
self.url = url
if headers:
self.headers = headers
if body:
self.body = body
if form:
self.form = form
def send(self):
webSite = self.client.internet.getWebSite(self.url)
return webSite.doHttpRequest(self)
def getFormField(self, name, default = 'none'):
if self.form and name in self.form:
return self.form[name]
if default == 'none':
raise KeyError(name)
return default
def getQueryBoolean(self, name, default = 'none'):
try:
fieldValue = self.getQueryField(name)
except KeyError:
if default == 'none':
raise
return default
return fieldValue.lower not in ('', '0', 'false')
def getQuery(self):
splitedUrl = self.url.split('?', 1)
if len(splitedUrl) > 1:
return splitedUrl[1]
else:
return ''
def getQueryField(self, name, default = 'none'):
if self.query:
for field in self.query.split('&'):
fieldName, fieldValue = field.split('=')
if name == fieldName:
return fieldValue
if default == 'none':
raise KeyError(name)
return default
query = property(getQuery)
class HttpResponse(object):
body = None
headers = None
statusCode = None # 200 or...
statusMessage = None
def __init__(self, statusCode, statusMessage = None, headers = None, body = None):
self.statusCode = statusCode
if statusMessage:
self.statusMessage = statusMessage
if headers:
self.headers = headers
if body:
self.body = body
class Internet(object):
webSites = None
def __init__(self):
self.webSites = {}
def addWebSite(self, webSite):
self.webSites[webSite.url] = webSite
def getWebSite(self, url):
for webSiteUrl, webSite in self.webSites.iteritems():
if url.startswith(webSiteUrl):
return webSite
raise Exception('Unknown web site: %s' % url)
class Simulation(object):
test = None # The testing instance
def __init__(self, test):
self.test = test
def fail(self, msg = None):
return self.test.fail(msg)
def failIf(self, expr, msg = None):
return self.test.failIf(expr, msg)
def failIfAlmostEqual(self, first, second, places = 7, msg = None):
return self.test.failIfAlmostEqual(first, second, places, msg)
def failIfEqual(self, first, second, msg = None):
return self.test.failIfEqual(first, second, msg)
def failUnless(self, expr, msg = None):
return self.test.failUnless(expr, msg)
def failUnlessAlmostEqual(self, first, second, places = 7, msg = None):
return self.test.failUnlessAlmostEqual(first, second, places, msg)
def failUnlessRaises(self, excClass, callableObj, *args, **kwargs):
return self.test.failUnlessRaises(self, excClass, callableObj, *args, **kwargs)
def failUnlessEqual(self, first, second, msg = None):
return self.test.failUnlessEqual(first, second, msg)
class WebClient(object):
internet = None
keyring = None
requestHeaders = {
'User-Agent': 'LassoSimulator/0.0.0',
'Accept': 'text/xml,application/xml,application/xhtml+xml,text/html',
}
webSessionIds = None # Simulate the cookies, stored in user's navigator, and containing the
# IDs of sessions already opened by the user.
def __init__(self, internet):
self.internet = internet
self.keyring = {}
self.webSessionIds = {}
def redirect(self, url):
return self.sendHttpRequest('GET', url)
def sendHttpRequest(self, method, url, headers = None, body = None, form = None):
requestHeaders = self.requestHeaders
if headers:
requestHeaders = self.requestHeaders.copy()
for name, value in headers.iteritems():
requestHeaders[name] = value
else:
requestHeaders = self.requestHeaders
return HttpRequest(
self, method, url, headers = requestHeaders, body = body, form = form).send()
def sendHttpRequestToSite(self, webSite, method, path, headers = None, body = None,
form = None):
url = webSite.url
if path:
if path[0] == '/':
while url[-1] == '/':
url = url[:-1]
elif url[-1] != '/':
url += '/'
url += path
return self.sendHttpRequest(method, url, headers = headers, body = body, form = form)
class Principal(WebClient):
"""Simulation of a user and its web navigator"""
name = None # The user name
def __init__(self, internet, name):
WebClient.__init__(self, internet)
self.name = name
class WebSession(object):
"""Simulation of session of a web site"""
expirationTime = None # A sample session variable
loginDump = None # Used only by some identity providers
uniqueId = None # The session number
sessionDump = None
webUserId = None # ID of logged user.
def __init__(self, uniqueId):
self.uniqueId = uniqueId
class WebUser(object):
"""Simulation of user of a web site"""
identityDump = None
language = 'fr' # A sample user variable
uniqueId = None # The user name is used as an ID in this simulation.
def __init__(self, uniqueId):
self.uniqueId = uniqueId
class WebSite(WebClient, Simulation):
"""Simulation of a web site"""
lastWebSessionId = 0
providerId = None # The Liberty providerID of this web site
responseHeaders = {
'Server': 'Lasso Simulator Web Server',
}
url = None # The main URL of web site
webUsers = None
webSessions = None
def __init__(self, test, internet, url):
Simulation.__init__(self, test)
WebClient.__init__(self, internet)
self.url = url
self.webUserIdsByNameIdentifier = {}
self.webUsers = {}
self.webSessionIdsByNameIdentifier = {}
self.webSessions = {}
self.internet.addWebSite(self)
def addWebUser(self, name):
self.webUsers[name] = WebUser(name)
def createWebSession(self, client):
self.lastWebSessionId += 1
webSession = WebSession(self.lastWebSessionId)
self.webSessions[self.lastWebSessionId] = webSession
client.webSessionIds[self.url] = self.lastWebSessionId
return webSession
def doHttpRequest(self, httpRequest):
url = httpRequest.url
if url.startswith(self.url):
url = url[len(self.url):]
methodName = url.split('?', 1)[0].replace('/', '')
method = getattr(self, methodName)
return method(httpRequest)
def getIdentityDump(self, principal):
webSession = self.getWebSession(principal)
webUser = self.getWebUserFromWebSession(webSession)
if webUser is None:
return None
return webUser.identityDump
def getSessionDump(self, principal):
webSession = self.getWebSession(principal)
if webSession is None:
return None
return webSession.sessionDump
def getWebSession(self, principal):
webSessionId = principal.webSessionIds.get(self.url, None)
if webSessionId is None:
# The user has no web session opened on this site.
return None
return self.webSessions.get(webSessionId, None)
def getWebSessionFromNameIdentifier(self, nameIdentifier):
webSessionId = self.webSessionIdsByNameIdentifier.get(nameIdentifier, None)
if webSessionId is None:
# The user has no federation on this site or has no authentication assertion for this
# federation.
return None
return self.webSessions.get(webSessionId, None)
def getWebUserFromNameIdentifier(self, nameIdentifier):
webUserId = self.webUserIdsByNameIdentifier.get(nameIdentifier, None)
if webUserId is None:
# The user has no federation on this site.
return None
return self.webUsers.get(webUserId, None)
def getWebUserFromWebSession(self, webSession):
if webSession is None:
return None
webUserId = webSession.webUserId
if webUserId is None:
# The user has no account on this site.
return None
return self.webUsers.get(webUserId, None)
def newHttpResponse(self, statusCode, statusMessage = None, headers = None, body = None):
responseHeaders = self.responseHeaders
if headers:
responseHeaders = self.responseHeaders.copy()
for name, value in headers.iteritems():
responseHeaders[name] = value
else:
responseHeaders = self.responseHeaders
return HttpResponse(
statusCode, statusMessage = statusMessage, headers = headers, body = body)