passerelle/passerelle/apps/cmis/models.py

168 lines
6.7 KiB
Python

# passerelle - uniform access to multiple data sources and services
# Copyright (C) 2016 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import base64
import functools
import httplib2
import json
import re
import urllib2
try:
import cStringIO as StringIO
except ImportError:
import StringIO
from cmislib import CmisClient
from cmislib.exceptions import CmisException
from cmislib.exceptions import ObjectNotFoundException
from cmislib.exceptions import PermissionDeniedException
from cmislib.exceptions import UpdateConflictException
from django.db import models
from django.utils.translation import ugettext_lazy as _
from passerelle.base.models import BaseResource
from passerelle.utils.api import endpoint
from passerelle.utils.jsonresponse import APIError
SPECIAL_CHARS = '!#$%&+-^_`~;[]{}+=~'
FILE_PATH_PATTERN = '^(/|(/[\w%s]+)+)$' % re.escape(SPECIAL_CHARS)
FILE_NAME_PATTERN = '[\w%s\.]+' % re.escape(SPECIAL_CHARS)
RE_FILE_PATH = re.compile(FILE_PATH_PATTERN)
RE_FILE_NAME = re.compile(FILE_NAME_PATTERN)
class CmisConnector(BaseResource):
cmis_endpoint = models.URLField(
max_length=400, verbose_name=_('CMIS Atom endpoint'),
help_text=_('URL of the CMIS Atom endpoint'))
username = models.CharField(max_length=128, verbose_name=_('Service username'))
password = models.CharField(max_length=128, verbose_name=_('Service password'))
category = _('Business Process Connectors')
class Meta:
verbose_name = _('CMIS connector')
def get_description_fields(self):
fields = super(CmisConnector, self).get_description_fields()
return [(x[0], x[1]) for x in fields if x[0].name != 'password']
@endpoint(methods=['post'], perm='can_access')
def uploadfile(self, request):
error, error_msg, data = self._validate_inputs(request.body)
if error:
self.logger.debug("received invalid data: %s" % error_msg)
raise APIError(error_msg, http_status=400)
self.logger.info("received file_name: '%s', file_path: '%s'", data['file']['filename'],
data["path"])
cmis_gateway = CMISGateway(self.cmis_endpoint, self.username, self.password, self.logger)
doc = cmis_gateway.create_doc(
data['file']['filename'], data['path'], data['file_byte_content'])
self.logger.info(
"create document '%s' with path '%s'", data['file']['filename'], data['path'])
return {'data': {'properties': doc.properties}}
def _validate_inputs(self, body):
""" process JSON body
return a tuple (error, error_msg, data)
"""
try:
data = json.loads(body)
except ValueError as e:
return True, "could not decode body to json: %s" % e, None
if 'file' not in data:
return True, '"file" is required', None
if 'path' not in data:
return True, '"path" is required', None
if not isinstance(data['file'], dict):
return True, '"file" must be a dict', None
if not isinstance(data['path'], unicode):
return True, '"path" must be string', None
if not RE_FILE_PATH.match(data['path']):
return True, '"path" must be valid path', None
file_ = data['file']
if 'filename' not in file_:
return True, '"file[\'filename\']" is required', None
if not isinstance(file_['filename'], unicode):
return True, '"file[\'filename\']" must be string', None
if not RE_FILE_NAME.match(file_['filename']):
return True, '"file[\'filename\']" must be valid file name', None
if 'content' not in file_:
return True, '"file[\'content\']" is required', None
if not isinstance(file_['content'], unicode):
return True, '"file[\'content\']" must be string', None
try:
data['file_byte_content'] = base64.b64decode(file_['content'])
except TypeError:
return True, '"file[\'content\']" must be a valid base64 string', None
return False, '', data
def wrap_cmis_error(f):
@functools.wraps(f)
def wrapper(*args, **kwargs):
try:
return f(*args, **kwargs)
except (urllib2.URLError, httplib2.HttpLib2Error) as e:
# FIXME urllib2 still used for cmslib 0.5 compat
raise APIError("connection error: %s" % e)
except PermissionDeniedException as e:
raise APIError("permission denied: %s" % e)
except UpdateConflictException as e:
raise APIError("update conflict: %s" % e)
except CmisException as e:
raise APIError("cmis binding error: %s" % e)
return wrapper
class CMISGateway(object):
def __init__(self, cmis_endpoint, username, password, logger):
self._cmis_client = CmisClient(cmis_endpoint, username, password)
self._logger = logger
def _get_or_create_folder(self, file_path):
repo = self._cmis_client.defaultRepository
try:
self._logger.debug("searching '%s'" % file_path)
res = repo.getObjectByPath(file_path)
self._logger.debug("'%s' found" % file_path)
return res
except ObjectNotFoundException:
self._logger.debug("'%s' not found" % file_path)
basepath = ""
folder = repo.rootFolder
for path_part in file_path.strip('/').split('/'):
basepath += '/%s' % path_part
try:
self._logger.debug("searching '%s'" % basepath)
folder = repo.getObjectByPath(basepath)
self._logger.debug("'%s' found" % basepath)
except ObjectNotFoundException:
self._logger.debug("'%s' not found" % basepath)
folder = folder.createFolder(path_part)
self._logger.debug("create folder '%s'" % basepath)
return folder
@wrap_cmis_error
def create_doc(self, file_name, file_path, file_byte_content):
folder = self._get_or_create_folder(file_path)
return folder.createDocument(file_name, contentFile=StringIO.StringIO(file_byte_content))