This repository has been archived on 2023-02-21. You can view files and clone it, but cannot push or open issues or pull requests.
glasnost/shared/proxy/DispatcherProxy.py

817 lines
27 KiB
Python

# -*- coding: iso-8859-15 -*-
# Glasnost
# By: Odile Bénassy <obenassy@entrouvert.com>
# Romain Chantereau <rchantereau@entrouvert.com>
# Nicolas Clapiès <nclapies@easter-eggs.org>
# Pierre-Antoine Dejace <padejace@entrouvert.be>
# Thierry Dulieu <tdulieu@easter-eggs.com>
# Florent Monnier <monnier@codelutin.com>
# Cédric Musso <cmusso@easter-eggs.org>
# Frédéric Péters <fpeters@entrouvert.be>
# Benjamin Poussin <poussin@codelutin.com>
# Emmanuel Raviart <eraviart@entrouvert.com>
# Sébastien Régnier <regnier@codelutin.com>
# Emmanuel Saracco <esaracco@easter-eggs.com>
#
# Copyright (C) 2000, 2001 Easter-eggs & Emmanuel Raviart
# Copyright (C) 2002 Odile Bénassy, Code Lutin, Thierry Dulieu, Easter-eggs,
# Entr'ouvert, Frédéric Péters, Benjamin Poussin, Emmanuel Raviart,
# Emmanuel Saracco & Théridion
# Copyright (C) 2003 Odile Bénassy, Romain Chantereau, Nicolas Clapiès,
# Code Lutin, Pierre-Antoine Dejace, Thierry Dulieu, Easter-eggs,
# Entr'ouvert, Florent Monnier, Cédric Musso, Ouvaton, Frédéric Péters,
# Benjamin Poussin, Rodolphe Quiédeville, Emmanuel Raviart, Sébastien
# Régnier, Emmanuel Saracco, Théridion & Vecam
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
"""Glasnost Dispatcher Proxy."""
__version__ = '$Revision$'[11:-2]
import errno
import marshal
import os
import socket
import time
import xmlrpclib
import glasnost.common.faults as faults
import glasnost.common.context as context
import glasnost.common.tools_new as commonTools
from tools import *
serverAccessors = {}
try:
socket.gaierror
except AttributeError:
socket.gaierror = None
try:
socket.sslerror
except AttributeError:
socket.sslerror = None
class FastServerProxy:
def __init__(self, transport=None, encoding=None, verbose=0):
self.__transport = transport
self.__encoding = encoding
self.__verbose = verbose
def __request(self, methodname, params):
request = {
'methodName': methodname,
'params': params
}
request = marshal.dumps(request)
response = self.__transport.request(
self.__host,
self.__handler,
request,
verbose=self.__verbose
)
if len(response) == 1:
response = response[0]
return response
def __getattr__(self, name):
if name.startswith('__'): raise AttributeError('blah')
# magic method dispatcher
return xmlrpclib._Method(self.__request, name)
class FastTransport:
"""Handles a short-circuited connection to an XML-RPC server"""
def __init__(self, host, port):
self.address = '/tmp/.glasnost:%s' % port
def request(self, host, handler, request_body, verbose=0):
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
s.connect(self.address)
s.send('%s\n' % len(request_body))
s.send(request_body)
answerHeader = ''
while not answerHeader:
try:
answerHeader = s.recv(12)
except socket.error, exception:
if exception[0] == errno.ECONNRESET:
# TODO: server got shutdown; webserver should send 503
pass
raise
try:
length = int(answerHeader)
except ValueError:
raise Exception('Invalid FastTransport header received = "%s"'
% answerHeader)
r = ''
while len(r) != length:
r += s.recv(50000)
r = marshal.loads(r)
if type(r) is type([]):
fault = faults.Fault(r[0], r[1])
if fault.faultCode in faults.faults.keys():
fault.__class__ = faults.faults[fault.faultCode]
raise fault
return r
class Lazy:
"""Remember a value in a sequence.
This class is used to save temporarly a value (usual in a sequence or
exception), without interpretate it.
The value is interpreted when the instance is called.
It is possible to specify a handler to interpretate the result, without it,
the returned value is the first value in the *value* sequence. With it, it
is the function return.
If the value is an dictionnary, it is interpretated as an exception, raised
only at the instance call.
Attributes:
===========
*resultHandler*:
The interpretation function name string.
*value*:
The value sequence to remember.
Constructor:
============
Keyword parameters:
-------------------
*value*:
The value sequence to remember.
*resultHandler*:
The interpretation function name string.
Instance Call:
==============
+ Return the interpretated result, calling the *resultHandler* function
with a new Lazy instance of the given *value* sequence (without
resultHandler).
The *resultHandler* function must call the given instance to retrieve the
first *value* sequence value or to raise the exception.
+ Raise the exception stored in *value*.
+ Return the *value* value.
"""
resultHandler = None
value = None
def __call__(self):
if self.resultHandler:
return self.resultHandler(Lazy(self.value))
else:
if type(self.value) == type({}):
# Value is a fault.
fault = faults.Fault(self.value['faultCode'],
self.value['faultString'])
if fault.faultCode in faults.faults.keys():
fault.__class__ = faults.faults[fault.faultCode]
raise fault
else:
# Value is a sequence containing the real value.
return self.value[0]
def __init__(self, value, resultHandler = None):
self.value = value
if resultHandler is not None:
self.resultHandler = resultHandler
class MultiCall:
"""Manage simultaneous XML RPCs.
A MultiCall instance is filled by calls. These calls are
Attributes:
===========
*calls*:
Sequence of calls. A call is a dictionnary defined as:
methodName:
The public XML RP name.
parameters:
The parameters to pass to the XML RP.
*earlyResults*:
Dictionnary of already known results (faults or returns). This
dictionnary has numeric keys. The key is the position in the calls
returns. No call is associated with an early result. All entry of the
dictionnary are sequences (componed of only one value).
*resultHandlers*:
Sequence of result handler function name string in the same order than
the calls.
*serverAccessor*:
The informations of the server to call.
Constructors:
=============
The calls and result handlers sequences and the dictionnary of already
recieved results are emptied.
"""
calls = None
earlyResults = None
resultHandlers = None
serverAccessor = None
def __init__(self):
self.calls = []
self.earlyResults = {}
self.resultHandlers = []
def add(self, serverAccessor, functionName, arguments, resultHandler):
"""Add a planned call to the multi call manager.
Keyword Arguments:
==================
*serverAccessor*:
The target server access informations.
*functionName*:
The public XML RP name to call.
*arguments*:
The arguments sequence to pass to the XML RP.
*resultHandler*:
An function name string. It will be called to pre-process the XML
RPC return.
Exceptions:
===========
*AssertionError*:
+ The server accessor is not the server of the instanciated
multicall process.
+ The port to send to is not the same as the one specified in the
instanciated multicall process.
"""
if self.serverAccessor is None:
self.serverAccessor = serverAccessor
else:
assert self.serverAccessor == serverAccessor
self.calls.append({
'methodName': functionName,
'params': arguments,
})
self.resultHandlers.append(resultHandler)
def addEarlyFault(self, earlyFault):
"""Add a already known exception result.
This method permits adding cached XML RPC Faults results.
If you want to add a already known results, see *addEarlyResult*
method.
This method adds the Fault instance in a one-element sequence to the
*earlyResults* dictionnary at the current position.
There is no associated call to the result.
Keyword argument:
=================
*earlyFault*:
The Fault instance to store in the early results dictionnary.
Exceptions:
===========
*AttributeError*:
The given *earlyFault* is not a Fault instance.
"""
i = len(self.calls) + len(self.earlyResults)
self.earlyResults[i] = {
'faultCode': earlyFault.faultCode,
'faultString': earlyFault.faultString,
}
def addEarlyResult(self, earlyResult):
"""Add a already known good result.
This method permits adding cached XML RPC non-fault results.
If you want to add a already known faults, see *addEarlFault* method.
This method add the result nito a one-element sequence to the
*earlyResults* dictionnary at the current position.
There is no associated call to the result.
Keyword argument:
=================
*earlyResult*:
The result to store in the early results dictionnary.
"""
i = len(self.calls) + len(self.earlyResults)
self.earlyResults[i] = [earlyResult]
def call(self):
"""Send the stacked XML RPC.
Send a multicall XML RP. The calls are sent in multicall mode (see
xmlrpclib).
Return a sequence of results. The results are in the same order as the
individual XML RPC (including the early result).
Exception:
==========
*Fault.\**:
The XML RPC fault exception.
"""
handledResults = []
if self.calls:
results = _callServer(self.serverAccessor, 'system.multicall',
[self.calls])
for i in range(len(self.resultHandlers)):
resultHandler = self.resultHandlers[i]
result = results[i]
handledResult = Lazy(result, resultHandler = resultHandler)
handledResults.append(handledResult)
earlyResultIndexes = self.earlyResults.keys()
earlyResultIndexes.sort()
for i in earlyResultIndexes:
handledResults.insert(i, Lazy(self.earlyResults[i]))
return handledResults
def _callServer(serverAccessor, functionName, arguments):
"""Internal server call function.
Keyword arguments:
==================
*serverAccessor*:
The informations of the server to call.
*functionName*:
The public RP name to call.
*arguments*:
The arguments sequence to pass to the called XML RP.
"""
serverProxy = serverAccessor['serverProxy']
try:
result = apply(getattr(serverProxy, functionName), arguments)
except socket.gaierror, exception:
if exception[0] == -3: # Temporary failure in name resolution
raise faults.UnknownDispatcherInId(serverAccessor['serverId'])
if exception[0] == -2: # Name or service not known
raise faults.UnknownDispatcherInId(serverAccessor['serverId'])
raise
except socket.error, exception:
if exception[0] in (
errno.ETIMEDOUT,
errno.ECONNREFUSED,
errno.EHOSTDOWN,
errno.EHOSTUNREACH, ):
raise faults.UnknownDispatcherInId(serverAccessor['serverId'])
raise
except xmlrpclib.ProtocolError, exception:
if exception.errcode == 406:
# Protocol Error: 406 Not Acceptable.
raise faults.UnknownDispatcherInId(serverAccessor['serverId'])
raise
except faults.Fault, fault:
if fault.faultCode in faults.faults.keys():
fault.__class__ = faults.faults[fault.faultCode]
raise
return result
def callServer(serverId, functionName, arguments, resultHandler = None,
multiCall = None):
"""Send a XML Rpc to the server.
Keyword arguments:
==================
*serverId*:
The server ID string to send Xml RPC.
*functionName*:
The public method name string to call on the server.
*arguments*:
The called server method arguments sequence.
*resultHandler*:
The fonction name string called to handle the XML RP result.
*multiCall*:
If true (or 1), the rpc is stacked in order to use XML RPC multiCall.
Return:
=======
*None*:
The call is going to be send in a multi-call.
*Unknown*:
+ The XML RP return.
+ The result of the resultHandler function if given.
Exception:
==========
*AssertionError*:
The server ID is not defined. (is None)
*AttributeError*:
The call procedure doesn't exist.
*ProtocolError*:
A protocol error in the underlying transport layer (such as a 404 'not
found' error if the server named by the URI does not exist).
*Fault.\**:
A Fault object encapsulates the content of an XML-RPC fault tag.
*Exception*:
All kind of exceptions raised by the called Remote Procedure.
"""
assert serverId is not None
# Ensure that serverId is valid.
serverId = commonTools.extractServerId(serverId)
if serverId == context.getVar('applicationId'):
server = context.getVar('server')
if server is not None:
result = server.rpcServer.dispatch_call(
functionName, arguments, isDirectCall = 1)
if multiCall:
multiCall.addEarlyResult(result)
return None
return result
if not serverAccessors.has_key(serverId) or (
serverAccessors[serverId].has_key('isDefault')
and serverAccessors[serverId]['isDefault']
and serverAccessors[serverId].has_key('creationTime')
and time.time() > serverAccessors[serverId]['creationTime'] + 120):
try:
serverAccessors[serverId] = getServerAccessor(serverId)
except faults.UnknownServerId, fault:
if multiCall is not None:
if resultHandler:
result = {
'faultCode': fault.faultCode,
'faultString': fault.faultString,
}
try:
result = resultHandler(Lazy(result))
multiCall.addEarlyResult(result)
except fault:
multiCall.addEarlyFault(fault)
else:
multiCall.addEarlyFault(fault)
return None
else:
raise
serverAccessor = serverAccessors[serverId]
if serverAccessor.has_key('isGateway') and serverAccessor['isGateway']:
assert len(arguments) >= 3
arguments = [arguments[0], arguments[1], arguments[2], serverId,
functionName, arguments]
functionName = 'callGateway'
if multiCall is not None:
# Instead of calling directly, pack all the calls in a multiCall.
multiCall.add(serverAccessor, functionName, arguments, resultHandler)
return None
if resultHandler is None:
return _callServer(serverAccessor, functionName, arguments)
else:
try:
result = [_callServer(serverAccessor, functionName, arguments)]
except faults.Fault, fault:
result = {
'faultCode': fault.faultCode,
'faultString': fault.faultString,
}
result = resultHandler(Lazy(result))
return result
def getApplicationId(applicationToken):
"""Get the application ID from the application token.
Keyword arguments:
==================
*applicationToken*:
The valid application token.
Return the application ID string.
Exception:
==========
*faults.UnknownApplicationToken*:
The given application token is not in the dispatcher virtual server ids
dictionnary.
"""
userToken = context.getVar('userToken', default = '')
dispatcherId = context.getVar('dispatcherId', default = '')
return callServer(
dispatcherId,
'getApplicationId',
[dispatcherId, getApplicationToken(), userToken, applicationToken])
def getApplicationToken():
applicationToken = context.getVar('applicationToken')
if applicationToken is None:
userToken = context.getVar('userToken', default = '')
applicationId = context.getVar('applicationId')
dispatcherId = context.getVar('dispatcherId', default = '')
applicationToken = callServer(
dispatcherId,
'getApplicationToken',
[dispatcherId, '', userToken, applicationId])
context.setVar('applicationToken', applicationToken)
return applicationToken
def getDispatcherAccessor(dispatcherId):
"""Return the informations needed to access the dispatcher."""
userToken = context.getVar('userToken', default = '')
currentDispatcherId = context.getVar('dispatcherId', default = '')
dispatcherHostNameAndPort = extractApplicationHostNameAndPort(dispatcherId)
dispatcherInfos = dispatcherHostNameAndPort.split(':', 1)
dispatcherHostName = dispatcherInfos[0]
if dispatcherHostName in ['help', 'system']:
dispatcherHostName = context.getVar('dispatcherHostName')
if len(dispatcherInfos) < 2:
dispatcherPort = context.getVar('dispatcherPort')
else:
dispatcherPort = int(dispatcherInfos[1])
dispatcherAccessor = {
'serverId': dispatcherId,
'serverHostName': dispatcherHostName,
'serverPort': dispatcherPort,
}
dispatcherHostNameAndPort = '%s:%s' % (dispatcherHostName, dispatcherPort)
# Try a fast local connection.
defaultDispatcherAccessor = None
transport = FastTransport(dispatcherHostName, dispatcherPort)
if os.path.exists(transport.address):
connectionType = 'local'
dispatcherProxy = FastServerProxy(transport = transport)
dispatcherAccessor['connectionType'] = connectionType
dispatcherAccessor['serverProxy'] = dispatcherProxy
try:
newDispatcherAccessor = _callServer(
dispatcherAccessor,
'getServerAccessor',
[currentDispatcherId, '', userToken, dispatcherId])
except faults.UnknownDispatcherInId:
pass
else:
newDispatcherAccessor['connectionType'] = connectionType
newDispatcherAccessor['serverId'] = dispatcherId
newDispatcherAccessor['serverProxy'] = dispatcherProxy
if not newDispatcherAccessor.has_key('isDefault') \
or not newDispatcherAccessor['isDefault']:
return newDispatcherAccessor
# The newDispatcherAccessor has been given by default by the local
# dispatcher. Before using it, we need to ensure that there is no
# remote dispatcher which can really handle this dispatcherId.
defaultDispatcherAccessor = newDispatcherAccessor
defaultDispatcherAccessor['creationTime'] = time.time()
# Try a XMLRPC SSL connection.
if socket.sslerror:
connectionType = 'xmlrpc-ssl'
dispatcherProxy = xmlrpclib.ServerProxy(
'https://%s' % dispatcherHostNameAndPort, allow_none = 1)
dispatcherAccessor['connectionType'] = connectionType
dispatcherAccessor['serverProxy'] = dispatcherProxy
try:
newDispatcherAccessor = _callServer(
dispatcherAccessor,
'getServerAccessor',
[currentDispatcherId, '', userToken, dispatcherId])
except socket.sslerror:
pass
except faults.UnknownDispatcherInId:
if defaultDispatcherAccessor is None:
raise
# There is no remote dispatcher for this dispatcherId. Return the
# default local accessor.
return defaultDispatcherAccessor
else:
newDispatcherAccessor['connectionType'] = connectionType
newDispatcherAccessor['serverId'] = dispatcherId
newDispatcherAccessor['serverProxy'] = dispatcherProxy
return newDispatcherAccessor
# Try a XMLRPC connection.
connectionType = 'xmlrpc'
dispatcherProxy = xmlrpclib.ServerProxy(
'http://%s' % dispatcherHostNameAndPort, allow_none = 1)
dispatcherAccessor['connectionType'] = connectionType
dispatcherAccessor['serverProxy'] = dispatcherProxy
try:
newDispatcherAccessor = _callServer(
dispatcherAccessor,
'getServerAccessor',
[currentDispatcherId, '', userToken, dispatcherId])
except faults.UnknownDispatcherInId:
if defaultDispatcherAccessor is None:
raise
# There is no remote dispatcher for this dispatcherId. Return the
# default local accessor.
return defaultDispatcherAccessor
newDispatcherAccessor['connectionType'] = connectionType
newDispatcherAccessor['serverId'] = dispatcherId
newDispatcherAccessor['serverProxy'] = dispatcherProxy
return newDispatcherAccessor
def getRegisteredRoles(dispatcherId = None):
userToken = context.getVar('userToken', default = '')
if not dispatcherId:
dispatcherId = context.getVar('dispatcherId', default = '')
return callServer(
dispatcherId,
'getRegisteredRoles',
[dispatcherId, getApplicationToken(), userToken])
def getServerAccessor(serverId):
"""Return the informations needed to access a server."""
serverAccessor = context.getVar('directServerAccessor')
if serverAccessor is None:
dispatcherId = commonTools.extractDispatcherId(serverId)
if not serverAccessors.has_key(dispatcherId) or (
serverAccessors[dispatcherId].has_key('isDefault')
and serverAccessors[dispatcherId]['isDefault']
and serverAccessors[dispatcherId].has_key('creationTime')
and time.time() > serverAccessors[dispatcherId]['creationTime']
+ 120):
serverAccessors[dispatcherId] = getDispatcherAccessor(dispatcherId)
dispatcherAccessor = serverAccessors[dispatcherId]
if not commonTools.extractRole(serverId):
return dispatcherAccessor
userToken = context.getVar('userToken', default = '')
currentDispatcherId = context.getVar('dispatcherId', default = '')
serverAccessor = _callServer(
dispatcherAccessor,
'getServerAccessor',
[currentDispatcherId, '', userToken, serverId])
connectionType = dispatcherAccessor['connectionType']
else:
serverAccessor = serverAccessor.copy()
connectionType = 'local'
serverHostName = serverAccessor['serverHostName']
serverPort = serverAccessor['serverPort']
serverHostNameAndPort = '%s:%s' % (serverHostName, serverPort)
if connectionType == 'local':
transport = FastTransport(serverHostName, serverPort)
assert os.path.exists(transport.address)
serverProxy = FastServerProxy(transport = transport)
elif connectionType == 'xmlrpc-ssl':
assert socket.sslerror
serverProxy = xmlrpclib.ServerProxy(
'https://%s' % serverHostNameAndPort, allow_none = 1)
else:
assert connectionType == 'xmlrpc'
serverProxy = xmlrpclib.ServerProxy(
'http://%s' % serverHostNameAndPort, allow_none = 1)
serverAccessor['connectionType'] = connectionType
serverAccessor['serverId'] = serverId
serverAccessor['serverProxy'] = serverProxy
if serverAccessor.has_key('isDefault') and serverAccessor['isDefault']:
serverAccessor['creationTime'] = time.time()
return serverAccessor
def registerDispatcherId(newDispatcherId, profiles):
userToken = context.getVar('userToken', default = '')
dispatcherId = context.getVar('dispatcherId', default = '')
callServer(
dispatcherId,
'registerDispatcherId',
[dispatcherId, getApplicationToken(), userToken, newDispatcherId,
profiles])
def registerServer(serverHostName, serverPort):
userToken = context.getVar('userToken', default = '')
dispatcherId = context.getVar('dispatcherId', default = '')
return callServer(
dispatcherId,
'registerServer',
[dispatcherId, getApplicationToken(), userToken, serverHostName,
serverPort])
def registerVirtualServer(serverHostName, serverPort, serverId):
userToken = context.getVar('userToken', default = '')
dispatcherId = context.getVar('dispatcherId', default = '')
callServer(
dispatcherId,
'registerVirtualServer',
[dispatcherId, getApplicationToken(), userToken, serverHostName,
serverPort, serverId])
def unregisterServer(serverHostName, serverPort):
userToken = context.getVar('userToken', default = '')
dispatcherId = context.getVar('dispatcherId', default = '')
callServer(
dispatcherId,
'unregisterServer',
[dispatcherId, getApplicationToken(), userToken, serverHostName,
serverPort])
def unregisterVirtualServer(serverId):
userToken = context.getVar('userToken', default = '')
dispatcherId = context.getVar('dispatcherId', default = '')
callServer(
dispatcherId,
'unregisterVirtualServer',
[dispatcherId, getApplicationToken(), userToken, serverId])
def updateDispatcherIdProfiles(newDispatcherId, profiles):
userToken = context.getVar('userToken', default = '')
dispatcherId = context.getVar('dispatcherId', default = '')
callServer(
dispatcherId,
'updateDispatcherIdProfiles',
[dispatcherId, getApplicationToken(), userToken,
newDispatcherId, profiles])