This repository has been archived on 2023-02-21. You can view files and clone it, but cannot push or open issues or pull requests.
expression/src/core/stations.py

1220 lines
50 KiB
Python

# -*- coding: UTF-8 -*-
# Expression
# By: Frederic Peters <fpeters@entrouvert.com>
# Emmanuel Raviart <eraviart@entrouvert.com>
#
# Copyright (C) 2004 Entr'ouvert, Frederic Peters & Emmanuel Raviart
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
"""Stations Module"""
import os
import types
import urllib
import libxml2
import libxslt
import environs
import faults
import http
import locations
import logs
import modules
import namespaces
import xpaths
_depth = 0
class AbstractStation(object):
_inRawMode = None # Boolean. When None, indicates inherited (=> ask parent).
_parent = None
isRemote = False
isRootElder = False
isUriDirectory = False # True for directory-like stations (see getUriDirectoryAbsolutePath)
previous = None
remoteUri = None
uriAuthority = None
uriPathFragment = None
uriScheme = None
def __init__(self, previous = None, parent = None, uriPathFragment = None,
uriScheme = None, uriAuthority = None, isRemote = False,
remoteUri = None):
if isRemote:
assert previous is None
assert parent is None
assert uriPathFragment is None
assert uriScheme is None
assert uriAuthority is None
assert remoteUri
self.isRemote = isRemote
self.remoteUri = remoteUri
else:
assert remoteUri is None
if previous is not None:
assert isinstance(previous, AbstractStation)
self.previous = previous
if parent is not None:
assert isinstance(parent, AbstractStation)
self._parent = parent
if uriPathFragment is not None:
self.uriPathFragment = uriPathFragment
if uriScheme is not None:
self.uriScheme = uriScheme
if uriAuthority is not None:
self.uriAuthority = uriAuthority
def __repr__(self):
try:
path = self.getAbsolutePath()
except:
path = "no path"
return "<%s.%s at %s>" % (
self.__class__.__module__, self.__class__.__name__, path)
def appendSessionTokenToUrl(self, uri):
"""Return the uri argument, appended with the session token if needed.
"""
if not environs.getVar("canUseCookie", default = False) \
or environs.getVar("testCookieSupport", default = False):
session = environs.getVar("session", default = None)
if session is not None and session.publishToken:
uri = locations.appendParameterToUrl(
uri, "sessionToken", session.token)
return uri
def callHttpFunction(self, function, uriPathFragments = None, command = "GET",
instruction = None):
if command == "HEAD":
command = "GET"
if uriPathFragments is None:
uriPathFragments = ()
environs.push(_level = "callHttpFunction",
currentStation = self,
httpCommand = command,
instruction = instruction)
try:
return function(*uriPathFragments)
finally:
environs.pull(_level = "callHttpFunction")
def checkAccess(self, uriPathFragments, command = None, instruction = None):
if not self.exists():
if instruction in ("exists", "existsAndIsAuthorized"):
return False
raise faults.PathNotFound("/".join(uriPathFragments))
if not self.isAccessAuthorized(None):
if instruction in ("existsAndIsAuthorized", "isAuthorized"):
return False
raise faults.PathUnauthorized("/".join(uriPathFragments))
return True
def checkAccessAndWalk(self, uriPathFragments, command = None, instruction = None):
if not self.checkAccess(uriPathFragments, command, instruction):
return False
return self.walk(uriPathFragments, command, instruction)
def constructAbsoluteUri(self, location):
lowerLocation = location.lower()
if lowerLocation.find(":", 0, 10) >= 0:
# Don't touch location if it begins with "http:", "mailto", etc.
return location
else:
# Relative URI, which may or not start with "file://".
# FIXME: This is not the good solution. We need a better way to
# convert file systems path to URI paths.
if lowerLocation.startswith("file://"):
absolutePath = location[len("file://") - 1:]
while len(absolutePath) > 1 and absolutePath[1] == "/":
absolutePath = absolutePath[1:]
else:
absolutePath = self.constructUriAbsolutePath(location)
if absolutePath == "/":
absolutePath = ""
return "%s://%s%s" % (
environs.getVar("uriScheme"), environs.getVar("uriAuthority"), absolutePath)
def constructUri(self, location, preserveAbsolutePath = False):
"""Return the best URI to use inside web pages."""
if location.find(":", 0, 10) >= 0:
# Don't touch location if it begins with "http:", "mailto", etc.
return location
else:
# We assume that location is a relative URI whithout "file://" prefix.
uri = self.constructUriAbsolutePath(
location, preserveAbsolutePath = preserveAbsolutePath)
uri = locations.cleanUpUrl(uri, "sessionToken")
if not environs.getVar("canUseCookie") or environs.getVar("testCookieSupport"):
session = environs.getVar("session")
if session is not None and session.publishToken:
uri = locations.appendParameterToUrl(uri, "sessionToken", session.token)
return uri
def constructUriAbsolutePath(self, location, preserveAbsolutePath = False):
# Assume that location is a relative URI without "file://" prefix.
if location.startswith("//"):
# Relative URI with network path
raise "TODO"
elif location and location[0] == "/":
# Relative URI with absolute path.
if preserveAbsolutePath:
return location
else:
absolutePath = environs.getVar("httpScriptDirectoryPath")
if absolutePath[-1] == "/":
return "%s%s" % (absolutePath, location[1:])
else:
return "%s/%s" % (absolutePath, location[1:])
elif location and location[0] == "#":
absolutePath = self.getUriAbsolutePath()
return "%s%s" % (absolutePath, location)
else:
# Relative URI with relative path.
absolutePath = self.getUriDirectoryAbsolutePath()
if absolutePath[-1] == "/":
return "%s%s" % (absolutePath, location)
else:
return "%s/%s" % (absolutePath, location)
def constructUriInternPath(self, location):
# Assume that location is a relative URI without "file://" prefix.
if location.startswith("//"):
# Relative URI with network path
raise "TODO"
elif location and location[0] == "/":
# Relative URI with absolute path.
return location
elif location and location[0] == "#":
internPath = self.getUriInternPath()
return "%s%s" % (internPath, location)
else:
# Relative URI with relative path.
internPath = self.getUriDirectoryInternPath()
if internPath[-1] == "/":
return "%s%s" % (internPath, location)
else:
return "%s/%s" % (internPath, location)
def convertAbsolutePathToBaseRelative(self, absolutePath):
baseAbsolutePath = self.getConfigAbsolutePath("yep:base", default = "/")
prefix = os.path.commonprefix([absolutePath, baseAbsolutePath])
baseRelativePath = absolutePath[len(prefix):]
if not baseRelativePath.startswith("/"):
baseRelativePath = "/" + baseRelativePath
return baseRelativePath
def convertBaseRelativePathToAbsolute(self, baseRelativePath):
while baseRelativePath[0] == "/":
baseRelativePath = baseRelativePath[1:]
return os.path.normpath(os.path.join(
self.getConfigAbsolutePath("yep:base", default = "/"), baseRelativePath))
def convertNodesToAbsolutePath(self, nodes, nodesOwner, xpath, default = 'none'):
if not nodes:
if default == 'none':
raise KeyError(xpath)
return default
location = nodes[0].content.strip()
return nodesOwner.convertRelativeLocationToAbsolutePath(location)
def convertNodesToList(self, nodes, nodesOwner, xpath, default = "none"):
if not nodes:
if default == "none":
raise KeyError(xpath)
return default
return [x.content for x in nodes]
def convertNodesToPythonClass(self, nodes, nodesOwner, xpath, default = "none"):
if not nodes:
if default == "none":
raise KeyError(xpath)
return default
classPath = nodes[0].content.strip()
i = classPath.rfind(".")
assert i >= 0
modulePath = classPath[:i]
component = __import__(modulePath)
componentNames = classPath.split(".")
try:
for componentName in componentNames[1:]:
component = getattr(component, componentName)
except AttributeError:
logs.info('Unknown Python class = "%s"' % classPath)
if default == "none":
raise KeyError(xpath)
return default
return component
def convertNodesToString(self, nodes, nodesOwner, xpath, default = "none"):
if not nodes:
if default == "none":
raise KeyError(xpath)
return default
return nodes[0].content
def convertPathToAbsolute(self, path):
if path.startswith("/"):
return self.convertBaseRelativePathToAbsolute(path)
else:
return os.path.normpath(os.path.join(self.getDirectoryAbsolutePath(), path))
def convertPathToBaseRelative(self, path):
if path.startswith("/"):
return path
else:
return os.path.normpath(os.path.join(self.getDirectoryBaseRelativePath(), path))
def convertRelativeLocationToAbsolutePath(self, location):
if not location:
return None
if location.startswith('http://') or location.startswith('https://'):
# Location is an absolute URL. Use it as a system identifier in XML catalog.
fileUri = libxml2.catalogResolveSystem(location)
if fileUri is None:
logs.debug('Unknown location "%s" in XML catalog.' % location)
return None
absolutePath = fileUri[len('file://'):]
return absolutePath
elif location.startswith('-//'):
# Location is a public URI. Use it as a public identifier in XML catalog.
fileUri = libxml2.catalogResolvePublic(location)
if fileUri is None:
logs.debug('Unknown location "%s" in XML catalog.' % location)
return None
absolutePath = fileUri[len('file://'):]
return absolutePath
else:
return self.convertPathToAbsolute(location)
def doHttpAccess(self):
return self
def doHttpDelete(self):
content = environs.getVar("submission").readFile()
if content is not None:
logs.debug(content)
environs.getVar("httpRequestHandler").outputErrorAccessForbidden(self.getUriAbsolutePath())
def doHttpDelete1(self, multistatus):
multistatusNode = multistatus.node
responseNode = multistatusNode.newTextChild(None, "response", None)
# The href URI is quoted, because Nautilus requires that the returned href is the
# same as the URL in the PROPFIND command. Otherwise it can not open "untitled
# folder", because it expects "untitled%20folder".
responseNode.newTextChild(None, "href", self.getAbsoluteUri(quote = True))
responseNode.newTextChild(None, "status", "HTTP/1.1 403 Access Forbidden")
return False
def doHttpGet(self):
if self.getInRawMode():
return self.source()
return self.styled()
def doHttpMkcol(self):
content = environs.getVar("submission").readFile()
if content is not None:
logs.debug(content)
environs.getVar("httpRequestHandler").outputErrorMethodNotAllowed(
"%s already exists" % self.getUriAbsolutePath())
def doHttpOptions(self):
# We need to flush the input buffer anyway.
content = environs.getVar("submission").readFile()
if content is not None:
logs.debug(content)
environs.getVar("httpRequestHandler").outputData(
"", contentLocation = None, headers = {"DAV": "1"})
def doHttpPost(self):
# FIXME
raise NotImplementedError
def doHttpPropfind(self):
import libxml2
import documents
content = environs.getVar("submission").readFile()
if content is not None:
logs.debug(content)
else:
#If no content, we construct an empty but valid XML content.
content = """<?xml version="1.0" encoding="utf-8"?>
<propfind xmlns="DAV:"><prop>
<getlastmodified xmlns="DAV:"/>
<creationdate xmlns="DAV:"/>
<resourcetype xmlns="DAV:"/>
<getcontenttype xmlns="DAV:"/>
<getcontentlength xmlns="DAV:"/>
<getetag xmlns="DAV:"/>
</prop></propfind>"""
depth = environs.getVar("httpRequestHandler").headers.get("Depth", "infinity")
try:
depth = int(depth)
except ValueError:
if depth != "infinity":
raise faults.BadRequest("Bad depth header for PROPFIND request = %s" % depth)
# Note: A PROPFIND request with a Depth: Infinity header can impose a large burden on
# the server. The WebDAV Working Group has stated that it is acceptable for DAV servers
# to refuse these kinds of requests. Properly written client software should not
# issue them.
else:
if depth > 1:
raise faults.BadRequest("Bad depth header for PROPFIND request = %s" % depth)
requestDocumentNode = libxml2.readDoc(
content, None, None,
libxml2.XML_PARSE_DTDLOAD | libxml2.XML_PARSE_NONET)
requestDocument = documents.TemporaryDocument(requestDocumentNode)
self.replyToHttpPropfindRequest(requestDocument, depth)
def doHttpPut(self):
httpRequestHandler = environs.getVar("httpRequestHandler")
httpRequestHandler.outputErrorMethodNotAllowed("Not a document")
def download(self):
"""Handles HTTP GET."""
command = environs.getVar("httpCommand")
if command == "GET":
self.outputHttpDownload()
else:
raise faults.PathNotFound("")
def exists(self):
return True
def generatePlainText(self):
logs.debug("generatePlainText: No plain text generation for %s." % self)
return u""
def generateXml(self, layout):
logs.debug("generateHtml: No XML generation for %s." % self)
return False
def generateXmlDocument(self):
import documents
xmlDocument = documents.newTemporaryDocument()
self.generateXml(xmlDocument)
return xmlDocument
def generateXmlDocumentStyledData(self, xmlDocument, stylesheet = None):
if stylesheet is None:
return xmlDocument.mimeType, xmlDocument.serialize()
stylesheetHolder = stylesheet.prototype.getDataHolder()
stylesheetStyle = libxslt.parseStylesheetDoc(stylesheetHolder.node)
if stylesheetStyle is None:
return xmlDocument.mimeType, xmlDocument.serialize()
environs.push(_level = "generateXmlDocumentStyledData",
currentXmlDocument = xmlDocument,
currentStation = self)
try:
styledDoc = stylesheetStyle.applyStylesheet(xmlDocument.node,
environs.getVar("xsltParams", None))
finally:
environs.pull(_level = "generateXmlDocumentStyledData")
data = stylesheetStyle.saveResultToString(styledDoc)
styledDoc.freeDoc()
stylesheetStyle.freeStylesheet()
return "text/html", data
def getAbsolutePath(self):
parent = self.getParent()
assert parent is not None
return parent.getAbsolutePath()
def getAbsoluteUri(self, quote = False):
"""Returns the absolute URI for this station.
example : http://localhost:1997/test.xml
"""
if self.isRemote:
return self.constructAbsoluteUri(self.remoteUri)
else:
parent = self.getParent()
if parent is None:
absoluteUri = self.getAbsoluteUriSchemeAndAuthority()
else:
absoluteUri = parent.getAbsoluteUri(quote = quote)
if self.uriPathFragment and quote:
uriPathFragment = urllib.quote(self.uriPathFragment)
else:
uriPathFragment = self.uriPathFragment
if uriPathFragment:
if absoluteUri[-1] == "/":
absoluteUri += uriPathFragment
else:
absoluteUri = "%s/%s" % (absoluteUri, uriPathFragment)
return absoluteUri
def getAbsoluteUriSchemeAndAuthority(self):
if self.uriScheme is None:
return "file:"
elif self.uriScheme in ("http", "https"):
return "%s://%s" % (self.uriScheme, self.uriAuthority)
else:
raise Exception("Unknown URI scheme = %s" % self.uriScheme)
def getActionUri(self, action = None):
uri = self.getActionUriAbsolutePath(action)
if not environs.getVar("canUseCookie") or environs.getVar("testCookieSupport"):
session = environs.getVar("session")
if session is not None and session.publishToken:
uri = locations.appendParameterToUrl(uri, "sessionToken", session.token)
return uri
def getActionUriAbsolutePath(self, action = None):
uri = self.getUriAbsolutePath()
if action:
if uri[-1] == "/":
uri = "%s%s" % (uri, action)
else:
uri = "%s/%s" % (uri, action)
return uri
def getAdministrators(self, command = None):
if command is None:
commandName = "ACCESS"
elif command == "HEAD":
commandName = "GET"
else:
commandName = command
# When there is no "commands" attribute, it means "all commands".
administratorsNodes, administratorsNodesOwner = self.getConfigNodesAndOwner(
"yep:administrators[not(@commands) or contains(@commands, '%s')]"
% commandName.replace("'", "&apos;"))
if administratorsNodes:
import elements
return elements.newElement(administratorsNodes[0], owner = administratorsNodesOwner)
return None
def getAllConfigElementNodesAndOwnerCouples(self, xpath):
parent = self.getParent()
if parent is None:
# Root directory of a virtual host.
virtualHost = environs.getVar("virtualHost", default = None)
if virtualHost is None:
configuration = environs.getVar("configuration", default = None)
if configuration is None:
return [], self
return configuration.getAllConfigElementNodesAndOwnerCouples(xpath)
return virtualHost.getAllConfigElementNodesAndOwnerCouples(xpath)
return parent.getAllConfigElementNodesAndOwnerCouples(xpath)
def getBaseRelativePath(self):
return self.convertAbsolutePathToBaseRelative(self.getBaseAbsolutePath())
def getConfigAbsolutePath(self, xpath, default = "none", ignoreGeneralValue = False):
nodes, nodesOwner = self.getConfigNodesAndOwner(
xpath, ignoreGeneralValue = ignoreGeneralValue)
return self.convertNodesToAbsolutePath(nodes, nodesOwner, xpath, default = default)
def getConfigBaseRelativePath(self, xpath, default = "none", ignoreGeneralValue = False):
absolutePath = self.getConfigAbsolutePath(
xpath, default = default, ignoreGeneralValue = ignoreGeneralValue)
return self.convertAbsolutePathToBaseRelative(absolutePath)
def getConfigBoolean(self, xpath, default = "none", ignoreGeneralValue = False):
nodes, nodesOwner = self.getConfigNodesAndOwner(
xpath, ignoreGeneralValue = ignoreGeneralValue)
if not nodes:
if default == "none":
raise KeyError(xpath)
return default
return nodes[0].content.strip().lower() not in ("0", "false")
def getConfigDataHolderNodesAndOwner(self, mimeType, xpath, ignoreGeneralValue = False):
parent = self.getParent()
if parent is None:
# Root directory of a virtual host.
virtualHost = environs.getVar("virtualHost", default = None)
if virtualHost is None:
configuration = environs.getVar("configuration", default = None)
if configuration is None:
return [], self
return configuration.getConfigDataHolderNodesAndOwner(
mimeType, xpath, ignoreGeneralValue = ignoreGeneralValue)
return virtualHost.getConfigDataHolderNodesAndOwner(
mimeType, xpath, ignoreGeneralValue = ignoreGeneralValue)
return parent.getConfigDataHolderNodesAndOwner(
mimeType, xpath, ignoreGeneralValue = ignoreGeneralValue)
def getConfigElementNodesAndOwner(self, namespaceUri, name, xpath, ignoreGeneralValue = False):
parent = self.getParent()
if parent is None:
# Root directory of a virtual host.
virtualHost = environs.getVar("virtualHost", default = None)
if virtualHost is None:
configuration = environs.getVar("configuration", default = None)
if configuration is None:
return [], self
return configuration.getConfigElementNodesAndOwner(
namespaceUri, name, xpath, ignoreGeneralValue = ignoreGeneralValue)
return virtualHost.getConfigElementNodesAndOwner(
namespaceUri, name, xpath, ignoreGeneralValue = ignoreGeneralValue)
return parent.getConfigElementNodesAndOwner(
namespaceUri, name, xpath, ignoreGeneralValue = ignoreGeneralValue)
def getConfigElementPythonClass(self, namespaceUri, name, xpath, default = "none",
ignoreGeneralValue = False):
nodes, nodesOwner = self.getConfigElementNodesAndOwner(
namespaceUri, name, xpath, ignoreGeneralValue = ignoreGeneralValue)
return self.convertNodesToPythonClass(nodes, nodesOwner, xpath, default = default)
def getConfigExistence(self, xpath, ignoreGeneralValue = False):
nodes, nodesOwner = self.getConfigNodesAndOwner(
xpath, ignoreGeneralValue = ignoreGeneralValue)
return bool(nodes)
def getConfigList(self, xpath, default = "none", ignoreGeneralValue = False):
nodes, nodesOwner = self.getConfigNodesAndOwner(
xpath, ignoreGeneralValue = ignoreGeneralValue)
return self.convertNodesToList(nodes, nodesOwner, xpath, default = default)
def getConfigNodesAndOwner(self, xpath, ignoreGeneralValue = False):
assert not ignoreGeneralValue
return self.getConfigStationNodesAndOwner(xpath)
def getConfigPythonClass(self, xpath, default = "none", ignoreGeneralValue = False):
nodes, nodesOwner = self.getConfigNodesAndOwner(
xpath, ignoreGeneralValue = ignoreGeneralValue)
return self.convertNodesToPythonClass(nodes, nodesOwner, xpath, default = default)
def getConfigStationNodesAndOwner(self, xpath):
parent = self.getParent()
if parent is None:
# Root directory of a virtual host.
virtualHost = environs.getVar("virtualHost", default = None)
if virtualHost is None:
configuration = environs.getVar("configuration", default = None)
if configuration is None:
return [], self
return configuration.getConfigDataHolderNodesAndOwner(
mimeType, xpath, ignoreGeneralValue = ignoreGeneralValue)
return virtualHost.getConfigStationNodesAndOwner(xpath)
return parent.getConfigStationNodesAndOwner(xpath)
def getConfigString(self, xpath, default = "none", ignoreGeneralValue = False):
nodes, nodesOwner = self.getConfigNodesAndOwner(
xpath, ignoreGeneralValue = ignoreGeneralValue)
return self.convertNodesToString(nodes, nodesOwner, xpath, default = default)
def getContainedFileSystemPath(self):
return self.getFileSystemPath()
def getContentAdministrators(self, contentName):
administratorsNodes, administratorsNodesOwner = self.getConfigNodesAndOwner(
"""yep:mode[@name = "%s"]/yep:administrators""" % contentName)
if administratorsNodes:
administratorsNode = administratorsNodes[0]
import elements
return elements.newElement(administratorsNode, owner = administratorsNodesOwner)
return None
def getContentUsers(self, contentName, command = None):
usersNodes, usersNodesOwner = self.getConfigNodesAndOwner(
"""yep:mode[@name = "%s"]/yep:users""" % contentName)
if usersNodes:
if command is None:
commandName = "ACCESS"
elif command == "HEAD":
commandName = "GET"
else:
commandName = command
for usersNode in usersNodes:
commandsNodes = xpaths.evaluateXpath("@commands", contextNode = usersNode)
if not commandsNodes or commandName in commandsNodes[0].content.split():
# When there is no "commands" attribute, it means "all commands".
import elements
return elements.newElement(usersNode, owner = usersNodesOwner)
# By defaut, no action can be done.
return None
def getDataHolder(self):
import dataholders
if isinstance(self, dataholders.DataHolder):
return self
else:
return self.getParent().getDataHolder()
def getDirectoryAbsolutePath(self):
return self.getParent().getDirectoryAbsolutePath()
def getDirectoryBaseRelativePath(self):
return self.convertAbsolutePathToBaseRelative(self.getDirectoryAbsolutePath())
def getElementFeature(self, namespaceUri, elementName):
elementFeature = None
# Try to get element features from config.
elementNodes, elementOwner = self.getConfigElementNodesAndOwner(
namespaceUri, elementName, None, ignoreGeneralValue = True)
if elementNodes:
elementNode = elementNodes[0]
elementClassNodes = xpaths.evaluateXpath("yep:pythonClass", contextNode = elementNode)
if elementClassNodes:
elementClass = self.convertNodesToPythonClass(
elementClassNodes, elementOwner, "yep:pythonClass", default = None)
else:
elementClass = None
schemaLocationNodes = xpaths.evaluateXpath("yep:schema", contextNode = elementNode)
if schemaLocationNodes:
schemaAbsolutePath = self.convertNodesToAbsolutePath(
schemaLocationNodes, elementOwner, "yep:schema", default = None)
else:
schemaAbsolutePath = None
descriptionLocationNodes = xpaths.evaluateXpath(
"yep:description", contextNode = elementNode)
if descriptionLocationNodes:
descriptionAbsolutePath = self.convertNodesToAbsolutePath(
descriptionLocationNodes, elementOwner, "yep:description", default = None)
else:
descriptionAbsolutePath = None
import elements
elementFeature = elements.ElementFeature(
namespaceUri, elementName, elementClass, schemaAbsolutePath,
descriptionAbsolutePath)
# Otherwise, try to get element feature from those registered by Python modules.
if elementFeature is None:
try:
elementFeature = modules.getElementFeature(namespaceUri, elementName)
except (KeyError, ValueError):
pass
return elementFeature
def getDocument(self):
return self.getParent().getDocument()
def getFileSystemPath(self):
parent = self.getParent()
assert parent is not None
return parent.getContainedFileSystemPath()
def getHttpPostUri(self):
return self.getUri()
def getInRawMode(self):
if self._inRawMode is not None:
return self._inRawMode
parent = self.getParent()
if parent is None:
return False
return parent.getInRawMode()
def getParent(self):
parent = self._parent
if parent is None and not self.isRootElder:
parent = self.previous
return parent
def getSimplestDownloadUrl(self):
"""If a station is downloadable, its content is his source."""
return self.getSimplestSourceUrl()
def getSimplestSourceUrl(self):
"""Return the simplest URL that can return the source of the station.
Return the simplest form. So when both /.../xxx and /.../xxx/source work, it returns
/.../xxx.
Return None, when the station has no source.
"""
if self.getInRawMode():
return self.getUri()
return self.getActionUri("source")
def getSimplestStyledUrl(self):
"""Return the simplest URL that can return a style X(HT)ML representation of the station.
Return the simplest form. So when both /.../xxx and /.../xxx/styled work, it returns
/.../xxx.
Return None, when the station can not be represented as X(HT)ML.
"""
if self.getInRawMode():
return self.getActionUri("styled")
return self.getUri()
def getSubPathInternUri(self, subPath):
assert not subPath or subPath[0] != "/"
uri = self.getUriInternPath()
if subPath is not None:
if uri[-1] == "/":
uri = "%s%s" % (uri, subPath)
else:
uri = "%s/%s" % (uri, subPath)
return uri
def getUri(self, quote = False):
"""Returns the station uri within the vhost, including the session token.
Use this method to get a link to this station from the same vhost.
Example: /dir/file.xml?sessionToken=123456
"""
return self.appendSessionTokenToUrl(self.getUriAbsolutePath(quote = quote))
def getUriAbsolutePath(self, quote = False):
"""Returns the station URL within the vhost.
Use getUri() rather, in case the Url needs to be appended with a sessionToken.
Example : "/dir/file.xml"
"""
previous = self.previous
if previous is None:
uriAbsolutePath = "/"
else:
uriAbsolutePath = previous.getUriAbsolutePath(quote = quote)
if self.uriPathFragment and quote:
uriPathFragment = urllib.quote(self.uriPathFragment)
else:
uriPathFragment = self.uriPathFragment
if uriPathFragment:
if uriAbsolutePath[-1] == "/":
uriAbsolutePath += uriPathFragment
else:
uriAbsolutePath = "%s/%s" % (uriAbsolutePath, uriPathFragment)
return uriAbsolutePath
def getUriDirectoryStation(self):
"""Returns the "directory" station, as shown in the URI.
recursive method used by getUriDirectoryAbsolutePath
"""
if self.isUriDirectory or self.isRootElder:
return self
else:
return self.previous.getUriDirectoryStation()
def getUriDirectoryAbsolutePath(self):
"""Returns the current directory URI
Examples :
"/dir/file.xml" -> "/dir"
"/dir" -> "/dir"
"""
return self.getUriDirectoryStation().getUriAbsolutePath()
def getUriDirectoryInternPath(self):
return self.getUriPrevious().getUriInternPath()
def getUriInternPath(self):
previous = self.previous
if previous is None:
uriInternPath = "/"
else:
uriInternPath = previous.getUriInternPath()
if self.uriPathFragment:
if uriInternPath[-1] == "/":
uriInternPath += self.uriPathFragment
else:
uriInternPath = "%s/%s" % (uriInternPath, self.uriPathFragment)
return uriInternPath
def getUriPrevious(self):
previous = self.previous
if previous is None:
return None
elif previous.uriPathFragment or previous.previous is None:
return previous
else:
return previous.getUriPrevious()
def getUsers(self, command = None):
if command is None:
commandName = "ACCESS"
elif command == "HEAD":
commandName = "GET"
else:
commandName = command
# When there is no "commands" attribute, it means "all commands".
usersNodes, usersNodesOwner = self.getConfigNodesAndOwner(
"yep:users[not(@commands) or contains(@commands, '%s')]"
% commandName.replace("'", "&apos;"))
if usersNodes:
import elements
return elements.newElement(usersNodes[0], owner = usersNodesOwner)
if command in (None, "GET", "HEAD"):
import elements
return elements.Everybody()
return None
def getWebDavContentLength(self):
return None
def getWebDavContentType(self):
return None
def getWebDavCreationDate(self):
return None
def getWebDavLastModified(self):
return None
def isAccessAuthorized(self, command):
users = self.getUsers(command)
if users is not None and users.containsUser():
return True
administrators = self.getAdministrators(command)
return administrators is not None and administrators.containsUser()
def isContentAccessAuthorized(self, contentName, command):
users = self.getContentUsers(contentName, command)
if users is not None and users.containsUser():
return True
administrators = self.getContentAdministrators(contentName)
return administrators is not None and administrators.containsUser()
def isWebDavCollection(self):
return False
def itemDestroyed(self, item):
"""Called before an item (hierarchical) is destroyed"""
pass
def itemRenamed(self, item, itemOldLocalId):
"""Called after an item (hierarchical) is renamed"""
pass
def outputHttpHtmlDocument(self, htmlDocument, stylesheet = None):
mimeType, data = self.generateXmlDocumentStyledData(htmlDocument, stylesheet)
environs.getVar("httpRequestHandler").outputData(data, mimeType = mimeType, contentLocation = self.getUri())
def outputHttpDownload(self):
raise faults.PathNotFound("")
def outputHttpSource(self):
raise faults.PathNotFound("")
def replyToHttpPropfindRequest(self, requestDocument, depth):
import webdav
multistatus = webdav.Multistatus.newInTemporaryDocument()
requestDocument.getRootElement().generateWebDavResponse(self, multistatus, depth)
data = multistatus.getDocument().serialize()
logs.debug("\nPROPFIND response =\n%s" % data)
environs.getVar("httpRequestHandler").outputData(
data, contentLocation = None, mimeType = "text/xml", successCode = 207)
def source(self):
"""Handles HTTP GET & PUT."""
command = environs.getVar("httpCommand")
if command == "GET":
self.outputHttpSource()
elif command == "PUT":
self.doHttpPut()
else:
raise faults.PathNotFound("")
def styled(self):
"""Handles HTTP GET."""
command = environs.getVar("httpCommand")
if command == "GET":
xmlDocument = self.generateXmlDocument()
if xmlDocument:
stylesheet = self.getDataHolder().getSiteXslt()
self.outputHttpHtmlDocument(xmlDocument, stylesheet)
else:
raise faults.PathNotFound("")
def unstyled(self):
"""Handles HTTP GET."""
command = environs.getVar("httpCommand")
if command == "GET":
if xmlDocument:
xmlDocument = self.generateXmlDocument()
self.outputHttpHtmlDocument(xmlDocument)
else:
raise faults.PathNotFound("")
def walk(self, uriPathFragments, command = None, instruction = None):
# Don't check access to self: it must have been done before call.
#
# If there is a content, check that it can be accessed, then walk to it.
import stations
logs.info("%s%s %s" % (" " * stations._depth, self.__class__.__name__, '/'.join(uriPathFragments)))
stations._depth += 1
if uriPathFragments:
content, contentUriPathFragments = self.walkToContent(
uriPathFragments, command, instruction)
assert content is not None
if content != self:
r = content.checkAccessAndWalk(contentUriPathFragments, command, instruction)
stations._depth -= 1
return r
uriPathFragments = contentUriPathFragments
# There is no content. The walk is finished (at least, it seems so) => call the default
# function.
# but first we may want to forbid a few referrers
bad_referrers = self.getConfigList("yep:badReferrers/yep:host", default = [])
if environs.getVar("httpRequestHandler").headers.get("Referer", "") in bad_referrers:
return environs.getVar("httpRequestHandler").outputErrorAccessForbidden(
self.getUriAbsolutePath())
try:
if command is None:
function = self.doHttpAccess
elif command == "DELETE":
function = self.doHttpDelete
elif command in ("GET", "HEAD"):
function = self.doHttpGet
elif command == "MKCOL":
function = self.doHttpMkcol
elif command == "OPTIONS":
function = self.doHttpOptions
elif command == "POST":
function = self.doHttpPost
elif command == "PROPFIND":
function = self.doHttpPropfind
elif command == "PUT":
function = self.doHttpPut
else:
raise Exception('Unknown HTTP command "%s"' % command)
except AttributeError:
function = None
logs.info('%s%s()' % (" " * stations._depth, function.__name__))
r = self.walkToFunction(function, uriPathFragments, command, instruction)
stations._depth -= 1
return r
def walkToAction(self, uriPathFragments, command = None, instruction = None):
actionName = uriPathFragments[0]
if hasattr(self, actionName):
method = getattr(self, actionName)
if type(method) == types.MethodType:
station = EasyFunctionStation(
actionName, method, previous = self, uriPathFragment = actionName)
return station, uriPathFragments[1:]
return self, uriPathFragments
def walkToContent(self, uriPathFragments, command = None, instruction = None):
item, itemUriPathFragments = self.walkToItem(uriPathFragments, command, instruction)
assert item is not None
if item != self or itemUriPathFragments != uriPathFragments:
return item, itemUriPathFragments
action, actionUriPathFragments = self.walkToAction(uriPathFragments, command, instruction)
assert action is not None
return action, actionUriPathFragments
def walkToFunction(self, function, uriPathFragments, command = None, instruction = None):
if function is None:
if instruction in ("exists", "existsAndIsAuthorized"):
return False
raise faults.PathNotFound("/".join(uriPathFragments))
# Check the number of arguments of function.
functionCode = function.func_code
acceptsArbitraryPositionalArguments = functionCode.co_flags & 4 != 0
if not acceptsArbitraryPositionalArguments:
argumentsCount = len(uriPathFragments)
if isinstance(function, types.MethodType):
argumentsCount += 1 # For argument "self".
maxArgumentsCount = functionCode.co_argcount
minArgumentsCount = maxArgumentsCount
if function.func_defaults is not None:
minArgumentsCount -= len(function.func_defaults)
if argumentsCount < minArgumentsCount or argumentsCount > maxArgumentsCount:
if instruction in ("exists", "existsAndIsAuthorized"):
return False
raise faults.PathNotFound("/".join(uriPathFragments))
if command is None:
if instruction in ("exists", "existsAndIsAuthorized", "isAuthorized"):
return True
return self
if not self.isAccessAuthorized(command):
if instruction in ("existsAndIsAuthorized", "isAuthorized"):
return False
raise faults.PathUnauthorized("/".join(uriPathFragments))
if instruction in ("exists", "existsAndIsAuthorized", "isAuthorized"):
# Note: In some cases, it will answer True, but the real call (with instruction ==
# None) may raise a fault.
return True
try:
return self.callHttpFunction(function, uriPathFragments, command, instruction)
except TypeError:
# Function takes exactly n argument (but t were given). Act as if the function
# wasn't found.
logs.exception("A TypeError exception occured during call of %s(%s)" % (
function, uriPathFragments))
if instruction in ("exists", "existsAndIsAuthorized"):
return False
raise faults.PathNotFound("/".join(uriPathFragments))
except faults.PathNotFound, fault:
logs.exception("Fault %s occured while calling %s(%s)" % (
fault, function, uriPathFragments))
if instruction in ("exists", "existsAndIsAuthorized"):
return False
raise
except (faults.PathForbidden, faults.PathUnauthorized), fault:
logs.exception("Fault %s occured while calling %s(%s)" % (
fault, function, uriPathFragments))
if instruction in ("existsAndIsAuthorized", "isAuthorized"):
return False
raise
def walkToItem(self, uriPathFragments, command = None, instruction = None):
return self, uriPathFragments
def walkToLocation(self, location, command = None, instruction = None, submission = None,
**keywords):
if locations.isLocalUri(location):
station = environs.getVar("rootStation")
uriInternPath = self.constructUriInternPath(location)
uriPathFragments = [name
for name in uriInternPath.split("/") if name]
else:
import dataholders
locationCore, locationExtension = os.path.splitext(location)
mimeType = None
if locationExtension in dataholders.mimeTypes:
mimeType = dataholders.mimeTypes[locationExtension]
station = dataholders.DataHolder(
isRemote = True, remoteUri = location, mimeType = mimeType)
uriPathFragments = []
if submission is None:
import submissions
submission = submissions.FakeSubmission(keywords)
else:
assert not keywords
environs.push(
_level = "walkToLocation",
submission = submission)
try:
return station.walk(uriPathFragments, command, instruction)
finally:
environs.pull(_level = "walkToLocation")
class AbstractContext(AbstractStation):
prototype = None
specimen = None
def __init__(self, prototype, specimen, previous = None, uriPathFragment = None):
super(AbstractContext, self).__init__(
previous = previous, uriPathFragment = uriPathFragment)
if prototype is not None:
self.prototype = prototype
if specimen is not None:
self.specimen = specimen
class EasyFunctionStation(AbstractStation):
actionName = None
firstWalkDone = False
function = None
uriPathFragments = ()
def __init__(self, actionName, function, *arguments, **keywords):
super(EasyFunctionStation, self).__init__(*arguments, **keywords)
assert actionName is not None
self.actionName = actionName
assert function is not None
self.function = function
def __repr__(self):
return "%s.%s.%s.%s%s" % (self.__class__.__module__, self.__class__.__name__,
self.actionName, self.function, self.uriPathFragments)
def isAccessAuthorized(self, command):
return self.previous.isContentAccessAuthorized(self.actionName, command)
def walk(self, uriPathFragments, command = None, instruction = None):
return self.walkToFunction(self.function, uriPathFragments, command, instruction)
class HolderConstructorStation(AbstractStation):
actionName = None
elementFeature = None
def __init__(self, actionName, elementFeature, *arguments, **keywords):
super(HolderConstructorStation, self).__init__(*arguments, **keywords)
assert actionName is not None
self.actionName = actionName
assert elementFeature is not None
self.elementFeature = elementFeature
def __repr__(self):
return "%s.%s(%s, %s)" % (self.__class__.__module__, self.__class__.__name__,
self.actionName, self.elementFeature)
def isAccessAuthorized(self, command):
return self.previous.isContentAccessAuthorized(self.actionName, command)
def walk(self, uriPathFragments, command = None, instruction = None):
xmlHolder = self.elementFeature.newXmlHolder(
self.previous, uriPathFragment = self.actionName, temporary = True)
return xmlHolder.checkAccessAndWalk(uriPathFragments, command, instruction)
class XslTransformStation(AbstractStation):
def __init__(self, xsltFilePathNode, nodeOwner, *arguments, **keywords):
super(XslTransformStation, self).__init__(*arguments, **keywords)
self.xsltFilePath = \
nodeOwner.convertRelativeLocationToAbsolutePath(xsltFilePathNode.content)
self.mimeType = xsltFilePathNode.prop("mimeType")
self.mode = xsltFilePathNode.prop("mode")
def transformation(self, uriPathFragments):
command = environs.getVar("httpCommand")
if command != "GET":
raise faults.PathNotFound("")
xmlDocument = self.getParent().generateXmlDocument()
stylesheetStyle = libxslt.parseStylesheetFile(self.xsltFilePath)
styledDoc = stylesheetStyle.applyStylesheet(xmlDocument.node,
environs.getVar("xsltParams", None))
data = styledDoc.serialize()
stylesheetStyle.freeStylesheet()
if self.mode == "raw":
environs.getVar("httpRequestHandler").outputData(data, mimeType = self.mimeType)
else:
self.outputHttpHtmlDocument(styledDoc, self.getParent().getDataHolder().getSiteXslt())
def walk(self, uriPathFragments, command = None, instruction = None):
return self.walkToFunction(self.transformation, uriPathFragments, command, instruction)
def walkToLocation(location, command = None, instruction = None, submission = None, **keywords):
if locations.isLocalUri(location):
station = environs.getVar("rootStation")
assert location and location[0] == "/", \
'Location "%s" is not an absolute path. Use method Station.walkToLocation instead.' \
% location
uriInternPath = location
uriPathFragments = [name for name in uriInternPath.split("/") if name]
else:
import dataholders
locationCore, locationExtension = os.path.splitext(location)
mimeType = None
if locationExtension in dataholders.mimeTypes:
mimeType = dataholders.mimeTypes[locationExtension]
station = dataholders.DataHolder(
isRemote = True, remoteUri = location, mimeType = mimeType)
uriPathFragments = []
if submission is None:
import submissions
submission = submissions.FakeSubmission(keywords)
else:
assert not keywords
environs.push(
_level = "walkToLocation",
submission = submission)
try:
return station.walk(uriPathFragments, command, instruction)
finally:
environs.pull(_level = "walkToLocation")