This repository has been archived on 2023-02-21. You can view files and clone it, but cannot push or open issues or pull requests.

453 lines
14 KiB

# -*- coding: iso-8859-15 -*-
# Glasnost
# By: Odile Bénassy <>
# Romain Chantereau <>
# Nicolas Clapiès <>
# Pierre-Antoine Dejace <>
# Thierry Dulieu <>
# Florent Monnier <>
# Cédric Musso <>
# Frédéric Péters <>
# Benjamin Poussin <>
# Emmanuel Raviart <>
# Sébastien Régnier <>
# Emmanuel Saracco <>
# Copyright (C) 2000, 2001 Easter-eggs & Emmanuel Raviart
# Copyright (C) 2002 Odile Bénassy, Code Lutin, Thierry Dulieu, Easter-eggs,
# Entr'ouvert, Frédéric Péters, Benjamin Poussin, Emmanuel Raviart,
# Emmanuel Saracco & Théridion
# Copyright (C) 2003 Odile Bénassy, Romain Chantereau, Nicolas Clapiès,
# Code Lutin, Pierre-Antoine Dejace, Thierry Dulieu, Easter-eggs,
# Entr'ouvert, Florent Monnier, Cédric Musso, Ouvaton, Frédéric Péters,
# Benjamin Poussin, Rodolphe Quiédeville, Emmanuel Raviart, Sébastien
# Régnier, Emmanuel Saracco, Théridion & Vecam
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
__doc__ = """Glasnost Common Tools"""
__version__ = '$Revision$'[11:-2]
import imp
import md5
import re
import os
import smtplib
import types
from xml.dom import minidom
except ImportError:
minidom = None
import new
import math
import glasnost
except ImportError:
class G:
applicationName = 'glasnost'
glasnost = G()
import faults
import context
import tools_new as commonTools
is8bit = re.compile("[\x80-\xff]").search
def extractApplicationHostName(id):
"""Return the application host name part of a token or application ID.
Keyword argument:
The application id /token string.
*Return the application host name string*.
The *id* is malformed.
The string has no fields separated by ':'.
Some tokens are malformed, so be careful.
return extractApplicationHostNameAndPort(id).split(':')[0]
except IndexError:
raise Exception('Malformed id or application id = %s' % str(id))
def extractApplicationHostNameAndPort(id):
assert id.startswith('glasnost://')
splittedApplicationId = id[11:].split('/', 1)
# applicationHostNameAndPort, applicationRole = splittedApplicationId
return splittedApplicationId[0]
except IndexError:
raise Exception('Malformed id or application id = %s' % str(id))
def extractApplicationPort(id):
infos = id[11:].split(':', 1)
if len(infos) < 2:
return context.getVar('dispatcherPort')
return infos[1]
_cachedCommonsByServerRole = {}
def getCommonForServerRole(serverRole):
if context.getVar('knownRoles') \
and serverRole not in context.getVar('knownRoles'):
return None
serverRoleCommon = commonTools.getConfig(
context.getVar('dispatcherId'), serverRole + '-common',
default = serverRole).replace('-', '')
if not _cachedCommonsByServerRole.has_key(serverRole):
# Code inspired from the module knee.
import glasnost.common
commonFileNames = os.listdir(glasnost.common.__path__[0])
for commonFileName in commonFileNames:
if commonFileName.endswith('') \
and commonFileName[:-9].lower() == serverRoleCommon:
commonName = commonFileName[:-3]
if hasattr(glasnost.common, commonName):
module = getattr(glasnost.common, commonName)
moduleTriplet = imp.find_module(
commonName, glasnost.common.__path__)
module = imp.load_module(
'glasnost.common.%s' % commonName,
moduleTriplet[0], moduleTriplet[1],
if moduleTriplet[0]:
setattr(glasnost.common, commonName, module)
common = module.__dict__[commonName + 'Mixin']()
return None
_cachedCommonsByServerRole[serverRoleCommon] = common
return _cachedCommonsByServerRole[serverRoleCommon]
def getDefaultDispatcherHostNameAndPort():
return context.getVar('dispatcherId')[11:]
def getStringFromDigest(object, digest):
"""Extract a string from an object via the MD5 digest.
Keyword arguments:
+ Formelly the path to the string location
+ A object containing the string.
+ A dictionnary of path (key or value).
+ A Sequence of path.
+ A string (carriage return are ommitted).
No corresponding string found.
The string that produce the given MD5 digest (without the carriage
objectType = type(object)
if objectType in [types.StringType, types.UnicodeType]:
if'\r\n', '\n')).hexdigest() == digest:
return object
return None
elif objectType in [types.ListType, types.TupleType]:
for item in object:
result = getStringFromDigest(item, digest)
if result is not None:
return result
elif objectType == types.DictionaryType:
for key, value in object.items():
result = getStringFromDigest(key, digest)
if result is not None:
return result
result = getStringFromDigest(value, digest)
if result is not None:
return result
return None
elif objectType == types.InstanceType:
for key, value in object.__dict__.items():
result = getStringFromDigest(key, digest)
if result is not None:
return result
result = getStringFromDigest(value, digest)
if result is not None:
return result
return None
return None
def iso8859_15(s):
"""Convert a string from utf-8 to iso-8859-15."""
# FIXME: due to yet untraced errors I switched from latin-0 to latin-1
if type(s) == types.UnicodeType:
return s.encode('latin-1')
elif is8bit(s):
return unicode(s, 'utf-8').encode('latin-1')
except UnicodeError:
return s
return s
def repairId(id):
if not id or id.startswith('glasnost://'):
return None
serverId, localId = id.split('/', 1)
assert serverId.startswith('ee')
assert serverId.endswith('Server')
serverRole = serverId[2:-6].lower()
except IndexError:
raise Exception('Malformed old id = %s' % str(id))
newId = '%s/%s/%s' % (context.getVar('dispatcherId'), serverRole, localId)
return newId
def sendMail(mailFrom, mailTo, mailSubject, mailMessage, mailPerson = None,
moreHeaders = None):
"""Send a Email.
Keyword arguments:
The sender email address field string.
The destination address(es) field string.
The mail subject line string.
The message string to send.
If the Person object has GPG abilities, the Person object has to be
gived in order to test if the mail have to be crypted.
Default: None.
If other hearders have to be included in the mail, this dictionnary
provide them. The key is the field name, the value, the field value.
Default: None.
The mail cannot be send. (Problem with the SMTP gateway/server).
if type(mailTo) in [types.StringType, types.UnicodeType]:
mailToStr = mailTo
mailTo = [mailTo]
mailToStr = ', '.join(mailTo)
mailHeader = "From: %s\nTo: %s\nSubject: %s\nMime-Type: 1.0\n" % (
mailFrom, mailToStr, mailSubject)
if moreHeaders:
mailHeader += '\n'.join([ '%s: %s' % x for x in moreHeaders.items()])
mailMessage = "\n" + mailMessage
gpgEncrypted = 0
if mailPerson:
from import getProxyForServerRole
preferencesProxy = getProxyForServerRole('preferences')
cryptEmails = preferencesProxy.getPreferenceByUserId(
if cryptEmails:
from gpg import Gpg
gpg = Gpg(email =,
fingerprint = mailPerson.fingerprint)
mailMessage = gpg.encrypt(mailMessage)
gpgEncrypted = gpg.isEncrypted(mailMessage)
except: # TODO: tighter check
if gpgEncrypted:
mailHeader += """\
Content-Type: multipart/encrypted; boundary=bound1;
Content-Type: application/pgp-encrypted
Version: 1
Content-Type: application/octet-stream
mailMessage = "%s\n%s" % (mailHeader, mailMessage)
if gpgEncrypted:
mailMessage += "\n--bound1--\n"
mailMessage = mailMessage.replace('\n', '\r\n')
smtpServerHostName = context.getVar('smtpServerHostName')
smtpServerPort = context.getVar('smtpServerPort')
smtpServer = smtplib.SMTP()
smtpServer.connect(smtpServerHostName, smtpServerPort)
smtpServer.sendmail(mailFrom, mailTo, mailMessage)
except: # TODO: tighter check
raise faults.SmtpError()
def splitApplicationId(applicationId):
assert applicationId.startswith('glasnost://')
splittedApplicationId = applicationId[11:].split('/', 2)
assert len(splittedApplicationId) == 2
# applicationHostNameAndPort, applicationRole = splittedApplicationId
except IndexError:
raise Exception('Malformed application id = %s' % str(applicationId))
except AssertionError:
raise Exception('Malformed application id = %s' % str(applicationId))
return splittedApplicationId
def splitId(id):
assert id.startswith('glasnost://')
splittedId = id[11:].split('/', 2)
assert len(splittedId) == 3
# applicationHostNameAndPort, applicationRole, localId = splittedId
except IndexError:
raise Exception('Malformed id = %s' % str(id))
except AssertionError:
raise Exception('Malformed id = %s' % str(id))
return splittedId
def utf8(s):
"""Convert a string from iso-8859-15 to utf-8."""
if type(s) == types.UnicodeType:
return s.encode('utf-8')
elif is8bit(s):
return unicode(s, 'latin-1').encode('utf-8')
return s
# Useful code cribbed from Mark Pilgrim:
# add useful 'text' method to minidom.Element class
def _text(self):
"""returns all text of a node in one string
text may be split into several Text nodes; minidom likes to create
separate Text nodes for carriage returns and ampersands and things"""
def isTextNode(node):
return isinstance(node, minidom.Text)
def getData(node):
return "".join(map(getData, filter(isTextNode, self.childNodes)))
return ""
if minidom:
minidom.Element.text = new.instancemethod(_text, None, minidom.Element)
class XmlToDict:
"""Map XML elements into an easier to use dictionary.
Add known namespaces URIs to be processed via knownNamespaces
XmlToDict.knownNamespaces["URI"] = prefix"""
separator = ':'
def __init__(self):
self.knownNamespaces = { u'' : u'dc' }
def isKnownElementNode(self, node):
return isinstance(node, minidom.Element) and \
((node.namespaceURI == None and -1 == node.tagName.find(":") ) or \
def knownNameValuePair(self, element):
tagName = element.tagName.strip()
tagNameParts = tagName.split(":")
if len(tagNameParts) == 2 and \
self.knownNamespaces.has_key(element.namespaceURI) and \
self.knownNamespaces != None:
tagName = self.knownNamespaces[element.namespaceURI] + \
self.separator + tagNameParts[1]
return (tagName, element.text())
def mapElementsToDictionary(self, nodeList):
return dict(map(self.knownNameValuePair, \
filter(self.isKnownElementNode, nodeList)))
def convertNodesToDict(nodeList, knownNamespaces = {}):
"""This function takes a list of nodes and returns a
dictionary that maps nodes names to node values. The optional
'knownNamespaces' provides a way to map known namespace URIs
to fixed prefixes."""
converter = XmlToDict()
for (k, v) in knownNamespaces.iteritems():
converter.knownNamespaces[k] = v
return converter.mapElementsToDictionary(nodeList)