989 lines
40 KiB
Python
989 lines
40 KiB
Python
# -*- coding: UTF-8 -*-
|
|
|
|
|
|
# Expression
|
|
# By: Frederic Peters <fpeters@entrouvert.com>
|
|
# Emmanuel Raviart <eraviart@entrouvert.com>
|
|
#
|
|
# Copyright (C) 2004 Entr'ouvert, Frederic Peters & Emmanuel Raviart
|
|
#
|
|
# This program is free software; you can redistribute it and/or
|
|
# modify it under the terms of the GNU General Public License
|
|
# as published by the Free Software Foundation; either version 2
|
|
# of the License, or (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program; if not, write to the Free Software
|
|
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
|
|
|
|
|
|
"""Elements Module
|
|
|
|
Elements are wrapper around libxml2 nodes.
|
|
Every elements belongs to a XML holder (which is a wrapper around libxml2
|
|
docs).
|
|
"""
|
|
|
|
|
|
import os
|
|
|
|
import libxml2
|
|
|
|
import environs
|
|
import faults
|
|
import filesystems
|
|
import locations
|
|
import logs
|
|
import modules
|
|
import namespaces
|
|
import nodes
|
|
import stations
|
|
import xpaths
|
|
|
|
|
|
class Element(stations.AbstractStation, nodes.NodeWrapper):
|
|
_description = None
|
|
_schema = None # Cache for XML schema
|
|
dummyNodes = None
|
|
# The nodes containing the namespace definitions managed by this
|
|
# element
|
|
owner = None
|
|
# The element or document which owns self.node.
|
|
# When not None, the node is not managed by self => node should not be freed on delete.
|
|
# When owner is "external", it means the node is managed by something unknown (either a
|
|
# libxml2 document not created by Expression (for exemple, a document created by applying
|
|
# an XSLT) or an unknown expression element or ...).
|
|
translatable = False
|
|
# whether contents can be translated
|
|
translated = False
|
|
# whether the element was already translated.
|
|
# Initialy not, unless an ancestor element already was (see __init__).
|
|
|
|
def __del__(self):
|
|
if self.node is not None:
|
|
if self.owner is None:
|
|
# logs.debug("Freing node %s of element %s" % (self.node, self))
|
|
self.node.freeNode()
|
|
del self.node
|
|
if self.dummyNodes is not None:
|
|
for dummyNode in self.dummyNodes:
|
|
dummyNode.freeNode()
|
|
del self.dummyNodes
|
|
|
|
def __init__(self, node = None, previous = None, parent = None, uriPathFragment = None,
|
|
owner = "undefined"):
|
|
stations.AbstractStation.__init__(
|
|
self, previous = previous, parent = parent, uriPathFragment = uriPathFragment)
|
|
if node is None:
|
|
namespaceUri, name = modules.getElementClassNamespaceUriAndName(self.__class__)
|
|
assert name, "Node %s in namespace URI %s has no name." % (self, namespaceUri)
|
|
node = libxml2.newNode(name)
|
|
node.setNs(self.newNamespace(namespaceUri))
|
|
elif isinstance(node, (list, tuple)):
|
|
assert len(node) == 3, "Wrong node triplet = %s" % (node)
|
|
namespacePrefix = node[0]
|
|
namespaceUri = node[1]
|
|
name = node[2]
|
|
node = libxml2.newNode(name)
|
|
node.setNs(self.newNamespace(namespaceUri, namespacePrefix))
|
|
else:
|
|
assert node.type == "element", "Node %s of type %s is not an element." % (
|
|
node, node.type)
|
|
assert node.name, "Node %s of type %s has no name." % (node, node.type)
|
|
assert owner != "undefined", 'Missing parameter "owner".'
|
|
if owner is not None:
|
|
self.owner = owner
|
|
nodes.NodeWrapper.__init__(self, node)
|
|
parent = self.getParent()
|
|
if parent is not None and isinstance(parent, Element):
|
|
self.translated = parent.translated
|
|
if self.translatable and not self.translated:
|
|
self.translate()
|
|
|
|
def absorbNode(self, element, isDirectElement = True):
|
|
"""Extract node from element and prepare it to be inserted in self.
|
|
|
|
Element will still contain the node but will not manage its memory
|
|
allocation anymore.
|
|
"""
|
|
|
|
if self.owner not in (None, "external"):
|
|
return self.owner.absorbNode(element, isDirectElement = False)
|
|
assert isinstance(element, Element)
|
|
assert element.owner in (None, self), \
|
|
"Trying to absorb an already absorbed node = %s" % element
|
|
if self.owner == "external":
|
|
element.owner = "external"
|
|
# Because of reconciliateNs (which will be executed ASAP, typically during append), the
|
|
# namespaces defined in element dummyNodes are not used => They can be freed when
|
|
# element will be freed, so we don't need to move dummyNodes of element to self.
|
|
else:
|
|
element.owner = self
|
|
if self.node.doc is None:
|
|
if element.dummyNodes is not None:
|
|
if self.dummyNodes is None:
|
|
self.dummyNodes = element.dummyNodes
|
|
else:
|
|
self.dummyNodes += element.dummyNodes
|
|
del element.dummyNodes
|
|
# else:
|
|
# Because of reconciliateNs (which will be executed ASAP, typically during append),
|
|
# the namespaces defined in element dummyNodes are not used => They can be freed
|
|
# when element will be freed, so we don't need to move dummyNodes of element to
|
|
# self.
|
|
return element.node
|
|
|
|
def append(self, item):
|
|
if isinstance(item, Element):
|
|
item = self.absorbNode(item) # Prepare to execute the next if.
|
|
if isinstance(item, libxml2.xmlCore):
|
|
self.node.addChild(item)
|
|
if self.node.doc is not None:
|
|
# self.node.reconciliateNs(self.node.doc)
|
|
self.node.doc.getRootElement().reconciliateNs(self.node.doc)
|
|
elif isinstance(item, unicode):
|
|
self.node.addContent(item.encode("UTF-8"))
|
|
elif isinstance(item, str):
|
|
self.node.addContent(item)
|
|
elif item is not None:
|
|
raise Exception("Wrong item = %s (of type %s)" % (item, type(item)))
|
|
|
|
def createChildElementNode(self, name, node = None, content = None):
|
|
""" creates and returns a child element for self.node, unless node is specified
|
|
|
|
Accepts names of the following forms:
|
|
- name[positionNumber]
|
|
- (name|alternateName)
|
|
- (name|alternateName)[positionNumber]
|
|
|
|
Manages known namespace prefixes.
|
|
|
|
Sets the new child's content if provided.
|
|
"""
|
|
if node is None:
|
|
node = self.node
|
|
elementNode = None
|
|
positionNumber = 1
|
|
nameWithPosition = name
|
|
if name[-1] == "]":
|
|
# remove the [positionNumber] part and remember it
|
|
positionNumber = int(name[name.index("[") + 1:name.index("]")])
|
|
name = name[:name.index("[")]
|
|
nameWithAlternatives = name
|
|
if name.find("(") == 0:
|
|
# extract name from (name|alternateName)
|
|
if "|" in name:
|
|
name = name[1:name.index("|")]
|
|
else:
|
|
name = name[1:]
|
|
# find the element's namespace
|
|
namespace = None
|
|
if ":" in name:
|
|
namespacePrefix, name = name.split(":")
|
|
try:
|
|
namespace = node.searchNs(node.doc, namespacePrefix)
|
|
except libxml2.treeError:
|
|
# prefix was not recognised
|
|
try:
|
|
namespaceUri = namespaces.getUri(namespacePrefix)
|
|
except KeyError:
|
|
pass
|
|
else:
|
|
try:
|
|
# namespace is the default namespace (no prefix)
|
|
namespace = node.searchNsByHref(node.doc, namespaceUri)
|
|
except libxml2.treeError:
|
|
# namespace not declared yet
|
|
namespace = node.newNs(namespaceUri, namespacePrefix)
|
|
# create the element and set its content
|
|
elementNode = node.newTextChild(namespace, name, content)
|
|
# make sure we are at the right position
|
|
siblingNodes = self.evaluateXpath(nameWithAlternatives, node)
|
|
if len(siblingNodes) > positionNumber:
|
|
currentPositionNumber = 1
|
|
# new node is not the last one -> move up
|
|
for siblingNode in siblingNodes:
|
|
if currentPositionNumber == positionNumber:
|
|
siblingNode.addPrevSibling(elementNode)
|
|
break
|
|
currentPositionNumber += 1
|
|
elif positionNumber > len(siblingNodes):
|
|
# new node position should be lower - insert empty nodes to move down
|
|
currentPositionNumber = len(siblingNodes)
|
|
while currentPositionNumber < positionNumber:
|
|
elementNode.addPrevSibling(elementNode.copyNode(False))
|
|
currentPositionNumber += 1
|
|
return elementNode
|
|
|
|
def createDescendantElementNode(self, xpath, node = None, content = None):
|
|
""" creates and returns a node at location xpath, starting from self.node, unless node is specified
|
|
|
|
If the parent nodes do not exist, they are created.
|
|
|
|
Specify content if you want to set the new child's content.
|
|
"""
|
|
if node is None:
|
|
node = self.node
|
|
elementNode = None
|
|
if "/" in xpath:
|
|
# we create the first parent if it does not exist, then we recurse on the remaining xpath
|
|
name, remainingXpath = xpath.split("/", 1)
|
|
localNodes = self.evaluateXpath(name, node)
|
|
if localNodes:
|
|
localNode = localNodes[0]
|
|
else:
|
|
localNode = self.createChildElementNode(name, node, None)
|
|
elementNode = self.createDescendantElementNode(remainingXpath, localNode, content)
|
|
else:
|
|
elementNode = self.createChildElementNode(xpath, node, content)
|
|
return elementNode
|
|
|
|
def customizeDescriptionPrototype(self, descriptionPrototype):
|
|
""" Customizes the description prototype if needed
|
|
|
|
Elements subclasses can make specific modifications to their description
|
|
prototype by overwriting this method.
|
|
|
|
This is sometimes needed in order to dynamically populate form controls
|
|
or add user-dependant restrictions.
|
|
"""
|
|
pass
|
|
|
|
def doHttpGet(self):
|
|
description = self.getDescription()
|
|
if description is not None:
|
|
try:
|
|
return description.checkAccessAndWalk(
|
|
[], environs.getVar("httpCommand"), environs.getVar("instruction"))
|
|
except (faults.PathForbidden, faults.PathNotFound, faults.PathUnauthorized), fault:
|
|
logs.exception("Ignoring fault %s in %s.checkAccessAndWalk()" % (
|
|
fault, description))
|
|
return super(Element, self).doHttpGet()
|
|
|
|
def doHttpPost(self):
|
|
description = self.getDescription()
|
|
if description is not None:
|
|
try:
|
|
return description.checkAccessAndWalk(
|
|
[], environs.getVar("httpCommand"), environs.getVar("instruction"))
|
|
except (faults.PathForbidden, faults.PathNotFound, faults.PathUnauthorized), fault:
|
|
logs.exception("Ignoring fault %s in %s.checkAccessAndWalk()" % (
|
|
fault, description))
|
|
return super(Element, self).doHttpPost()
|
|
|
|
def generatePlainText(self):
|
|
return self.generatePlainTextContext(self.newContext(self, previous = self))
|
|
|
|
def generatePlainTextContext(self, context):
|
|
import elements
|
|
layout = []
|
|
childNode = self.node.children
|
|
while childNode is not None:
|
|
if childNode.type in ("cdata", "text"):
|
|
layout.append(childNode.content.decode("UTF-8"))
|
|
childNode = childNode.next
|
|
continue
|
|
if childNode.type != "element":
|
|
childNode = childNode.next
|
|
continue
|
|
element = self.newElement(childNode)
|
|
elementContext = element.newContext(context.specimen, previous = context)
|
|
layout.append(elementContext.generatePlainText())
|
|
childNode = childNode.next
|
|
return u"".join(layout)
|
|
|
|
def generateXml(self, layout):
|
|
return self.generateXmlContext(self.newContext(self, previous = self), layout)
|
|
|
|
def generateXmlContext(self, context, layout):
|
|
description = self.getDescription()
|
|
if description is not None:
|
|
return description.generateXml(layout)
|
|
newElement = self.generateXmlContextElement(context, layout)
|
|
self.generateXmlContextAttributes(context, newElement)
|
|
self.generateXmlContextChildren(context, newElement)
|
|
return True
|
|
|
|
def generateXmlContextAttributes(self, context, newElement):
|
|
filled = False
|
|
attribute = self.node.properties
|
|
while attribute is not None:
|
|
attributeName = attribute.name
|
|
attributeContent = attribute.content
|
|
newElement.node.newProp(attributeName, attributeContent)
|
|
filled = True
|
|
attribute = attribute.next
|
|
return filled
|
|
|
|
def generateXmlContextChildren(self, context, layout):
|
|
filled = False
|
|
childNode = self.node.children
|
|
while childNode is not None:
|
|
if childNode.type != "element":
|
|
layout.append(childNode.copyNode(True))
|
|
filled = True
|
|
childNode = childNode.next
|
|
continue
|
|
element = self.newElement(childNode)
|
|
elementContext = element.newContext(context.specimen, previous = context)
|
|
if elementContext.generateXml(layout):
|
|
filled = True
|
|
childNode = childNode.next
|
|
return filled
|
|
|
|
def generateXmlContextElement(self, context, layout):
|
|
namespacePrefix = self.getNamespacePrefix()
|
|
namespaceUri = self.getNamespaceUri()
|
|
name = self.node.name
|
|
newElementClass = modules.getElementClass(namespaceUri, name)
|
|
newElement = newElementClass(node = (namespacePrefix, namespaceUri, name), owner = None)
|
|
layout.append(newElement)
|
|
return newElement
|
|
|
|
def getConfigNodesAndOwner(self, xpath, ignoreGeneralValue = False):
|
|
namespaceUri = self.getNamespaceUri()
|
|
name = self.node.name
|
|
if namespaceUri is None:
|
|
return [], self
|
|
assert namespaceUri and name, 'Invalid namespace URI "%s" or name "%s" for %s' % (
|
|
namespaceUri, name, self)
|
|
return self.getConfigElementNodesAndOwner(
|
|
namespaceUri, name, xpath, ignoreGeneralValue = ignoreGeneralValue)
|
|
|
|
def getContentAtXpath(self, xpath, contextNode = None):
|
|
""" returns the specified node content or value, None on an empty element
|
|
ex: emailAddress = myUser.getContentAtXpath("yep:person/yep:email")
|
|
|
|
raises an Exception unless exactly one node matches xpath
|
|
"""
|
|
nodes = self.evaluateXpath(xpath, contextNode)
|
|
if not nodes:
|
|
raise Exception("Node not found at %s" % xpath)
|
|
if len(nodes) > 1:
|
|
raise Exception("Several nodes found at %s" % xpath)
|
|
for node in nodes:
|
|
if node.children:
|
|
return node.content
|
|
|
|
def getDescription(self):
|
|
if self._description is None:
|
|
if not self.getConfigBoolean("yep:useDescriptions", default = True):
|
|
self._description = "none"
|
|
return None
|
|
try:
|
|
import expression.modules.xforms.descriptions as descriptions
|
|
except ImportError:
|
|
self._description = "none"
|
|
else:
|
|
descriptionAbsolutePath = None
|
|
# Look for a description attribute in element.
|
|
descriptionLocationNodes = self.evaluateXpath("@description")
|
|
if descriptionLocationNodes:
|
|
descriptionAbsolutePath = self.convertNodesToAbsolutePath(
|
|
descriptionLocationNodes, self, "@description", default = None)
|
|
# Otherwise, look for a description element in config.
|
|
if descriptionAbsolutePath is None:
|
|
descriptionAbsolutePath = self.getConfigAbsolutePath(
|
|
"yep:description", default = None, ignoreGeneralValue = True)
|
|
# Otherwise, look for a description declaration made by a Python module.
|
|
if descriptionAbsolutePath is None:
|
|
try:
|
|
feature = modules.getElementFeature(self.getNamespaceUri(), self.node.name)
|
|
except KeyError:
|
|
pass
|
|
else:
|
|
descriptionAbsolutePath = feature.descriptionAbsolutePath
|
|
|
|
if descriptionAbsolutePath is None:
|
|
self._description = "none"
|
|
else:
|
|
descriptionHolder = descriptions.getDescriptionHolder(descriptionAbsolutePath)
|
|
descriptionPrototype = descriptionHolder.getRootElement()
|
|
self.customizeDescriptionPrototype(descriptionPrototype)
|
|
self._description = descriptionPrototype.newContext(self, previous = self)
|
|
if self._description == "none":
|
|
return None
|
|
return self._description
|
|
|
|
def getDocument(self):
|
|
if self.owner is None:
|
|
return self.getParent().getDocument()
|
|
elif self.owner == "external":
|
|
raise Exception("Unknown document for element %s." % self)
|
|
else:
|
|
return self.owner.getDocument()
|
|
|
|
def getId(self):
|
|
sourceNodes = self.evaluateXpath("@id")
|
|
if sourceNodes:
|
|
return sourceNodes[0].content
|
|
else:
|
|
return None
|
|
|
|
def getNamespacePrefix(self):
|
|
try:
|
|
return self.node.ns().name
|
|
except libxml2.treeError:
|
|
# The libxml2 Python binding raises this exception when xmlNs returns None.
|
|
return None
|
|
|
|
def getNamespaceUri(self):
|
|
try:
|
|
return self.node.ns().content
|
|
except libxml2.treeError:
|
|
# The libxml2 Python binding raises this exception when xmlNs returns None.
|
|
return None
|
|
|
|
def getSchema(self):
|
|
if self._schema is None:
|
|
schemaAbsolutePath = None
|
|
# Look for a schema attribute in element.
|
|
schemaLocationNodes = self.evaluateXpath("@schema")
|
|
if schemaLocationNodes:
|
|
schemaAbsolutePath = self.convertNodesToAbsolutePath(
|
|
schemaLocationNodes, self, "@schema", default = None)
|
|
# Otherwise, look for a schema element in config.
|
|
if schemaAbsolutePath is None:
|
|
schemaAbsolutePath = self.getConfigAbsolutePath(
|
|
"yep:schema", default = None, ignoreGeneralValue = True)
|
|
# Otherwise, look for a schema declaration made by a Python module.
|
|
if schemaAbsolutePath is None:
|
|
try:
|
|
feature = modules.getElementFeature(self.getNamespaceUri(), self.node.name)
|
|
except KeyError:
|
|
pass
|
|
else:
|
|
schemaAbsolutePath = feature.schemaAbsolutePath
|
|
|
|
if schemaAbsolutePath is None:
|
|
self._schema = "none"
|
|
else:
|
|
import xmlschemas
|
|
schemaHolder = xmlschemas.getSchemaHolder(schemaAbsolutePath)
|
|
schemaPrototype = schemaHolder.getRootElement()
|
|
self._schema = schemaPrototype.newContext(self, previous = self)
|
|
if self._schema == "none":
|
|
return None
|
|
return self._schema
|
|
|
|
def getSource(self):
|
|
sourceNodes = self.evaluateXpath("@src")
|
|
if sourceNodes:
|
|
return sourceNodes[0].content
|
|
else:
|
|
return None
|
|
|
|
def getType(self):
|
|
schema = self.schema
|
|
if schema is None:
|
|
return None
|
|
return schema.getGlobalElementType(self, self.node.name)
|
|
|
|
def layoutRequiresForm(self):
|
|
description = self.getDescription()
|
|
if description is not None:
|
|
return description.instanceLayoutRequiresForm()
|
|
return False
|
|
|
|
def newContext(self, specimen, *attributes, **keywords):
|
|
return ElementContext(self, specimen, *attributes, **keywords)
|
|
|
|
def newElement(self, elementNode):
|
|
# There doesn't exist any URI path to access an element => no uriPathFragment.
|
|
return newElement(elementNode, previous = self, owner = self)
|
|
|
|
def newInTemporaryDocument(cls, defaultNamespaceUri = None):
|
|
import documents
|
|
|
|
document = documents.newTemporaryDocument(defaultNamespaceUri = defaultNamespaceUri)
|
|
rootElement = cls(None, previous = document)
|
|
document.append(rootElement)
|
|
return rootElement
|
|
newInTemporaryDocument = classmethod(newInTemporaryDocument)
|
|
|
|
def newNamespace(self, namespaceUri, namespacePrefix = "none"):
|
|
# Create a dummy node whose only role is to contain the namespace used by the real node.
|
|
# This dummy node is required, because it seems that it is not possible to remove a
|
|
# namespace definition from a node. So we store the namespace definition in a dummy node.
|
|
if self.owner == "external":
|
|
raise Exception(
|
|
"Unable to add a new namespace to an externally owned element = %s" % self)
|
|
if self.owner is not None:
|
|
return self.owner.newNamespace(namespaceUri)
|
|
dummyNode = libxml2.newNode("dummy")
|
|
if self.dummyNodes is None:
|
|
self.dummyNodes = []
|
|
self.dummyNodes.append(dummyNode)
|
|
# Try to replace namespacePrefix with the standard one.
|
|
try:
|
|
namespacePrefix = namespaces.getName(namespaceUri)
|
|
except KeyError:
|
|
# Keep previous namespacePrefix if it exists.
|
|
if namespacePrefix == "none":
|
|
raise
|
|
return dummyNode.newNs(namespaceUri, namespacePrefix)
|
|
|
|
def outputHttpSource(self):
|
|
data = self.serialize()
|
|
dataHolder = self.getDataHolder()
|
|
environs.getVar("httpRequestHandler").outputData(
|
|
data, mimeType = dataHolder.mimeType,
|
|
modificationTime = dataHolder.getModificationTime())
|
|
|
|
def prepareForSaving(self):
|
|
""" gives a chance to the Element instance for self-checking
|
|
|
|
useful after a form submit, to trigger custom actions
|
|
"""
|
|
pass
|
|
|
|
def register(cls, namespaceUri, elementName):
|
|
modules.registerElementClass(namespaceUri, elementName, cls)
|
|
register = classmethod(register)
|
|
|
|
def setContentAtXpath(self, xpath, content):
|
|
""" sets the content of the node defined by xpath
|
|
|
|
The node is either an attribute or an element.
|
|
|
|
The node is created if needed.
|
|
"""
|
|
self.setDescendantNodeContent(xpath, content)
|
|
|
|
def setDescendantNodeContent(self, xpath, content, ancestorNode = None):
|
|
""" sets the content of the descendant node located at xpath
|
|
|
|
The descendant node is created if needed.
|
|
|
|
Supported xpath expression forms include "(A|ns:B|C)[5]/D/@attribute"
|
|
"""
|
|
if ancestorNode is None:
|
|
ancestorNode = self.node
|
|
if xpath.find("@") == 0:
|
|
# local attribute
|
|
ancestorNode.setProp(xpath[1:], content)
|
|
elif "/" in xpath and xpath.split("/")[-1].find("@") == 0: # descendant attribute
|
|
elementXpath = xpath[:xpath.rindex("/")]
|
|
elementNodes = self.evaluateXpath(elementXpath, ancestorNode)
|
|
if elementNodes:
|
|
elementNode = elementNodes[0]
|
|
else:
|
|
elementNode = self.createDescendantElementNode(elementXpath, ancestorNode)
|
|
attributeName = xpath[xpath.rindex("/") + 2:]
|
|
elementNode.setProp(attributeName, content)
|
|
else: # element (local or remote)
|
|
elementNodes = self.evaluateXpath(xpath, ancestorNode)
|
|
if elementNodes:
|
|
node = elementNodes[0]
|
|
node.setContent(node.doc.encodeEntitiesReentrant(content))
|
|
else:
|
|
elementNode = self.createDescendantElementNode(xpath, ancestorNode, content)
|
|
|
|
def setElementAttribute(self, prefixedName, attributeName, attributeValue):
|
|
# FIXME: Handle default namespace better.
|
|
nodes = self.evaluateXpath(prefixedName)
|
|
if nodes:
|
|
node = nodes[0]
|
|
else:
|
|
if ":" in prefixedName:
|
|
prefix, name = prefixedName.split(":", 1)
|
|
else:
|
|
prefix = None
|
|
name = prefixedName
|
|
# FIXME: Handle the case where node namespace is not the same as
|
|
# self.node namespace.
|
|
node = self.node.newTextChild(None, name, None)
|
|
# Note: Don't use newNode & addChild instead of newTextChild,
|
|
# because after addChild, the namespace of subnode is empty
|
|
# instead of being inherited (and subsequent xpath queries
|
|
# fail).
|
|
#~ node = libxml2.newNode(name)
|
|
#~ self.node.addChild(node)
|
|
node.setProp(attributeName, attributeValue)
|
|
|
|
def setElementContent(self, prefixedName, content):
|
|
nodes = self.evaluateXpath(prefixedName)
|
|
if nodes:
|
|
node = nodes[0]
|
|
# Note: Don't use setContent, because for some nodes it tries to
|
|
# convert entities references.
|
|
# node.setContent(content)
|
|
if node.children is not None:
|
|
childNode = node.children
|
|
childNode.unlinkNode()
|
|
childNode.freeNodeList()
|
|
node.addChild(node.doc.newDocText(content))
|
|
else:
|
|
if ":" in prefixedName:
|
|
namespacePrefix, name = prefixedName.split(":", 1)
|
|
else:
|
|
namespacePrefix = None
|
|
name = prefixedName
|
|
try:
|
|
namespace = self.node.searchNs(self.node.doc, namespacePrefix)
|
|
except libxml2.treeError:
|
|
# The libxml2 Python binding raises this exception when xmlSearchNs returns None.
|
|
namespace = self.newNamespace(namespaces.getUri(namespacePrefix))
|
|
self.node.newTextChild(namespace, name, content)
|
|
# Note: Don't use newNode & addChild instead of newTextChild,
|
|
# because after addChild, the namespace of subnode is empty
|
|
# instead of being inherited (and subsequent xpath queries
|
|
# fail).
|
|
# node = libxml2.newNode(name)
|
|
# self.node.addChild(node)
|
|
# Note: Don't use setContent, because for some nodes it tries to
|
|
# convert entities references.
|
|
# node.setContent(content)
|
|
|
|
def translate(self):
|
|
""" look up a translation for the element's content serialised form
|
|
and modify own content accordingly
|
|
|
|
On success, boolean self.translated is set.
|
|
"""
|
|
original = ""
|
|
child = self.node.children
|
|
while child:
|
|
original += child.serialize()
|
|
child = child.next
|
|
if original:
|
|
translation = _(original)
|
|
if translation != original:
|
|
try:
|
|
doc = libxml2.readDoc(
|
|
"<?xml version=\"1.0\"?>\n<yep:translation %s>%s</yep:translation>" % (
|
|
" ".join(
|
|
[
|
|
"xmlns:%s=\"%s\"" % (ns.name, ns.uri)
|
|
for ns in namespaces.nsList
|
|
]
|
|
),
|
|
translation
|
|
),
|
|
"",
|
|
None,
|
|
libxml2.XML_PARSE_DTDLOAD | libxml2.XML_PARSE_NONET | libxml2.XML_PARSE_NOWARNING
|
|
)
|
|
except libxml2.treeError:
|
|
logs.debug("Unparsable translation for %s: %s" % (self.serialize(), translation))
|
|
else:
|
|
child = self.node.children
|
|
while (child):
|
|
next = child.next
|
|
child.unlinkNode()
|
|
child.freeNode()
|
|
child = next
|
|
self.node.addChildList(
|
|
doc.getRootElement().children
|
|
)
|
|
self.translated = True
|
|
|
|
def walk(self, uriPathFragments, command = None, instruction = None):
|
|
try:
|
|
return super(Element, self).walk(uriPathFragments, command, instruction)
|
|
except (faults.PathForbidden, faults.PathNotFound, faults.PathUnauthorized):
|
|
description = self.getDescription()
|
|
if description:
|
|
return description.checkAccessAndWalk(uriPathFragments, command, instruction)
|
|
|
|
if not uriPathFragments:
|
|
raise
|
|
|
|
modes, nodeOwner = self.getConfigNodesAndOwner(
|
|
"yep:mode[@name = '%s']" % uriPathFragments[0])
|
|
if modes:
|
|
mode = modes[0]
|
|
xsltFilePathNode = xpaths.evaluateXpath("yep:xsltFilePath", mode)
|
|
if xsltFilePathNode:
|
|
station = stations.XslTransformStation(xsltFilePathNode[0], nodeOwner,
|
|
previous = self, uriPathFragment = uriPathFragments[0])
|
|
return station.walk(uriPathFragments, command, instruction)
|
|
raise
|
|
|
|
|
|
id = property(getId)
|
|
schema = property(getSchema)
|
|
source = property(getSource)
|
|
type = property(getType)
|
|
|
|
|
|
class ElementContext(stations.AbstractContext):
|
|
def generatePlainText(self):
|
|
return self.prototype.generatePlainTextContext(self)
|
|
|
|
def generateXml(self, layout):
|
|
return self.prototype.generateXmlContext(self, layout)
|
|
|
|
|
|
class ElementFeature(object):
|
|
descriptionAbsolutePath = None
|
|
elementClass = None
|
|
elementName = None
|
|
holderClass = None
|
|
holderConstructor = None
|
|
namespaceUri = None
|
|
schemaAbsolutePath = None
|
|
|
|
def __init__(self, namespaceUri, elementName, elementClass = None, schemaAbsolutePath = None,
|
|
descriptionAbsolutePath = None, holderClass = None, holderConstructor = None):
|
|
if namespaceUri is not None:
|
|
self.namespaceUri = namespaceUri
|
|
if elementName is not None:
|
|
self.elementName = elementName
|
|
if elementClass is not None:
|
|
self.elementClass = elementClass
|
|
if schemaAbsolutePath is not None:
|
|
self.schemaAbsolutePath = schemaAbsolutePath
|
|
if descriptionAbsolutePath is not None:
|
|
self.descriptionAbsolutePath = descriptionAbsolutePath
|
|
if holderClass is not None:
|
|
self.holderClass = holderClass
|
|
if holderConstructor is not None:
|
|
self.holderConstructor = holderConstructor
|
|
|
|
def newXmlHolder(self, directoryHolder, pathFragment = None, uriPathFragment = None,
|
|
temporary = False):
|
|
if self.holderConstructor is None:
|
|
if self.holderClass is None:
|
|
fileNameExtension = ".xml"
|
|
else:
|
|
fileNameExtension = self.holderClass.defaultFileNameExtension
|
|
import dataholders
|
|
return dataholders.newXmlHolder(
|
|
self.namespaceUri, self.elementName, directoryHolder, pathFragment = pathFragment,
|
|
uriPathFragment = uriPathFragment, fileNameExtension = fileNameExtension,
|
|
temporary = temporary)
|
|
else:
|
|
return self.holderConstructor(
|
|
directoryHolder, pathFragment = pathFragment, uriPathFragment = uriPathFragment,
|
|
temporary = temporary)
|
|
|
|
def register(self):
|
|
modules.registerElementFeature(self.namespaceUri, self.elementName, self)
|
|
if self.elementClass is not None:
|
|
self.elementClass.register(self.namespaceUri, self.elementName)
|
|
if self.holderClass is not None:
|
|
self.holderClass.register()
|
|
modules.registerHolderClass(self.namespaceUri, self.elementName, self.holderClass)
|
|
|
|
|
|
class AuthenticatedUsers(Element):
|
|
"""Special group that designates every authenticated user.
|
|
|
|
When the attribute "authenticationMethods" is missing, it means every authenticated user;
|
|
otherwise, it means every user authenticated with one of the methods listed in attribute
|
|
"authenticationMethods".
|
|
"""
|
|
|
|
name = N_("Authenticated Users")
|
|
|
|
def containsUser(self):
|
|
user = environs.getVar("user")
|
|
if user is None:
|
|
return False
|
|
authenticationMethods = self.getAuthenticationMethods()
|
|
if authenticationMethods is None:
|
|
return True
|
|
session = environs.getVar("session")
|
|
if session is None:
|
|
return False
|
|
authenticationMethod = session.getAuthenticationMethod()
|
|
return authenticationMethod in authenticationMethods
|
|
|
|
def getAuthenticationMethods(self):
|
|
nodes = self.evaluateXpath("yep:authenticationMethods")
|
|
if not nodes:
|
|
return None
|
|
return nodes[0].content.split(" ")
|
|
|
|
|
|
class Everybody(Element):
|
|
"""Special group that designates every user, even the unauthenticated ones."""
|
|
|
|
name = N_("Everybody")
|
|
|
|
def containsUser(self):
|
|
return True
|
|
|
|
|
|
class List(Element):
|
|
itemsXpath = "*"
|
|
|
|
def __iter__(self):
|
|
nodes = self.evaluateXpath(self.itemsXpath)
|
|
for node in nodes:
|
|
yield self.newElement(node)
|
|
|
|
|
|
class RootElement(Element):
|
|
"""A RootElement is an XML element which can be the root element of a document."""
|
|
isUriDirectory = True
|
|
|
|
|
|
class SimpleList(Element):
|
|
"""SimpleLists contain only simple items (ie items with only a content)."""
|
|
itemName = None
|
|
itemsXpath = "*"
|
|
|
|
def __contains__(self, itemContent):
|
|
for content in self:
|
|
if content == itemContent:
|
|
return True
|
|
return False
|
|
|
|
def __iter__(self):
|
|
nodes = self.evaluateXpath(self.itemsXpath)
|
|
for node in nodes:
|
|
yield node.content
|
|
|
|
def append(self, itemContent):
|
|
node = self.node.newTextChild(None, self.itemName, itemContent)
|
|
|
|
|
|
class UsersSet(List):
|
|
def containsUser(self):
|
|
session = environs.getVar("session")
|
|
if session is None:
|
|
authenticationMethod = None
|
|
else:
|
|
authenticationMethod = session.getAuthenticationMethod()
|
|
user = environs.getVar("user")
|
|
if user is None:
|
|
userAbsoluteUri = None
|
|
else:
|
|
userAbsoluteUri = user.getAbsoluteUri()
|
|
groupAbsoluteUris = []
|
|
for item in self:
|
|
itemNode = item.node
|
|
if itemNode.name == "authenticatedUsers":
|
|
if user is None:
|
|
continue
|
|
authenticationMethods = itemNode.prop("authenticationMethods")
|
|
if authenticationMethods is None:
|
|
return True
|
|
if authenticationMethod in authenticationMethods:
|
|
return True
|
|
continue
|
|
if itemNode.name == "everybody":
|
|
return True
|
|
itemSource = item.source
|
|
if itemSource is None:
|
|
continue
|
|
itemAbsoluteUri = self.constructAbsoluteUri(itemSource)
|
|
if itemAbsoluteUri == userAbsoluteUri:
|
|
return True
|
|
elif itemNode.name == "group":
|
|
groupAbsoluteUris.append(itemAbsoluteUri)
|
|
if groupAbsoluteUris:
|
|
groupsHolder = self.walkToLocation("/groups")
|
|
for groupAbsoluteUri in groupAbsoluteUris:
|
|
groupHolder = groupsHolder.getItem(locations.extractUriLocalId(groupAbsoluteUri))
|
|
if groupHolder is not None:
|
|
group = groupHolder.getRootElement()
|
|
if group.contains(userAbsoluteUri, authenticationMethod):
|
|
return True
|
|
return False
|
|
|
|
|
|
def newElement(node, previous = None, parent = None, uriPathFragment = None, owner = "undefined"):
|
|
assert node.type == "element"
|
|
try:
|
|
namespaceUri = node.ns().content
|
|
except libxml2.treeError:
|
|
# The libxml2 Python binding raises this exception when xmlNs returns None.
|
|
namespaceUri = None
|
|
elementName = node.name
|
|
class_ = None
|
|
# If this element specifies its Python class with attribute "pythonClass" and it is valid, then
|
|
# use it.
|
|
classPathNodes = xpaths.evaluateXpath("@pythonClass", node)
|
|
if classPathNodes:
|
|
classPath = classPathNodes[0].content
|
|
i = classPath.rfind(".")
|
|
assert i >= 0
|
|
modulePath = classPath[:i]
|
|
component = __import__(modulePath)
|
|
componentNames = classPath.split(".")
|
|
try:
|
|
for componentName in componentNames[1:]:
|
|
component = getattr(component, componentName)
|
|
except AttributeError:
|
|
logs.info('Unknown Python class = "%s". Using default class.' % classPath)
|
|
else:
|
|
class_ = component
|
|
if class_ is None:
|
|
# Otherwise, look for element Python class in config.
|
|
if parent is None:
|
|
parentStation = previous
|
|
else:
|
|
parentStation = parent
|
|
if parentStation is not None:
|
|
class_ = parentStation.getConfigElementPythonClass(
|
|
namespaceUri, elementName, "yep:pythonClass", default = None,
|
|
ignoreGeneralValue = True)
|
|
if class_ is None:
|
|
# Otherwise, look for an element Python classes registered by a Python module.
|
|
class_ = modules.getElementClass(namespaceUri, elementName)
|
|
return class_(
|
|
node = node, previous = previous, parent = parent, uriPathFragment = uriPathFragment,
|
|
owner = owner)
|
|
|
|
|
|
def registerElement(namespaceUri, elementName, elementClass = None, schemaLocation = None,
|
|
descriptionLocation = None, holderClass = None, holderConstructor = None):
|
|
# The locations are relative to Python data. Convert them to absolute paths
|
|
schemaAbsolutePath = None
|
|
if schemaLocation:
|
|
if schemaLocation.startswith("http://") or schemaLocation.startswith("https://"):
|
|
# Location is an absolute URL. Use it as a system identifier in XML catalog.
|
|
schemaFileUri = libxml2.catalogResolveSystem(schemaLocation)
|
|
if schemaFileUri is None:
|
|
logs.debug("Missing schema file '%s'." % schemaLocation)
|
|
else:
|
|
schemaAbsolutePath = schemaFileUri[len("file://"):]
|
|
else:
|
|
assert not schemaLocation.startswith("/")
|
|
for path in ("/usr/local/share/expression", "/usr/share/expression"):
|
|
schemaAbsolutePath = os.path.normpath(os.path.join(path, schemaLocation))
|
|
if os.access(schemaAbsolutePath, os.R_OK):
|
|
break
|
|
else:
|
|
logs.debug("Missing schema file '%s'." % schemaLocation)
|
|
schemaAbsolutePath = None
|
|
descriptionAbsolutePath = None
|
|
if descriptionLocation:
|
|
if descriptionLocation.startswith("http://") or descriptionLocation.startswith("https://"):
|
|
# Location is an absolute URL. Use it as a system identifier in XML catalog.
|
|
descriptionFileUri = libxml2.catalogResolveSystem(descriptionLocation)
|
|
if descriptionFileUri is None:
|
|
logs.debug("Missing description file '%s'." % descriptionLocation)
|
|
elif descriptionFileUri.startswith("file://"):
|
|
descriptionAbsolutePath = descriptionFileUri[len("file://"):]
|
|
else:
|
|
assert not descriptionLocation.startswith("/")
|
|
for path in ("/usr/local/share/expression", "/usr/share/expression"):
|
|
descriptionAbsolutePath = os.path.normpath(os.path.join(path, descriptionLocation))
|
|
if os.access(descriptionAbsolutePath, os.R_OK):
|
|
break
|
|
else:
|
|
logs.debug("Missing description file '%s'." % descriptionLocation)
|
|
descriptionAbsolutePath = None
|
|
elementFeature = ElementFeature(
|
|
namespaceUri, elementName, elementClass, schemaAbsolutePath, descriptionAbsolutePath,
|
|
holderClass, holderConstructor)
|
|
elementFeature.register()
|
|
|
|
|
|
registerElement(namespaces.yep.uri, "authenticatedUsers", AuthenticatedUsers)
|
|
registerElement(namespaces.yep.uri, "administrators", UsersSet)
|
|
registerElement(namespaces.yep.uri, "element", Element)
|
|
registerElement(namespaces.yep.uri, "everybody", Everybody)
|
|
registerElement(namespaces.yep.uri, "users", UsersSet)
|
|
|
|
modules.setDefaultElementClass(Element)
|