passerelle/passerelle/apps/cmis/models.py

420 lines
15 KiB
Python

# passerelle - uniform access to multiple data sources and services
# Copyright (C) 2016 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import base64
import binascii
import functools
import json
import re
from contextlib import contextmanager
from io import BytesIO
from urllib import error as urllib2
import httplib2
import requests
from cmislib import CmisClient
from cmislib.exceptions import (
CmisException,
InvalidArgumentException,
ObjectNotFoundException,
PermissionDeniedException,
UpdateConflictException,
)
from django.contrib.postgres.fields import JSONField
from django.db import models
from django.http import HttpResponse
from django.utils import timezone
from django.utils.functional import cached_property
from django.utils.translation import gettext_lazy as _
from passerelle.base.models import BaseResource
from passerelle.utils.api import endpoint
from passerelle.utils.jsonresponse import APIError, JSONEncoder
from passerelle.utils.logging import ignore_loggers
SPECIAL_CHARS = '!#$%&+-^_`~;[]{}+=~'
FILE_PATH_PATTERN = r'^(/|(/[\w%s]+)+)$' % re.escape(SPECIAL_CHARS)
FILE_NAME_PATTERN = r'[\w%s\.]+$' % re.escape(SPECIAL_CHARS)
UPLOAD_SCHEMA = {
'type': 'object',
'title': _('CMIS file upload'),
'properties': {
'file': {
'title': _('File object'),
'type': 'object',
'properties': {
'filename': {
'type': 'string',
'description': _('Filename'),
'pattern': FILE_NAME_PATTERN,
'pattern_description': _('Numbers, letters and special caracters "%s" are allowed.')
% SPECIAL_CHARS,
},
'content': {
'type': 'string',
'description': _('Content'),
},
'content_type': {
'type': 'string',
'description': _('Content type'),
},
},
'required': ['content'],
},
'filename': {
'type': 'string',
'description': _('Filename (takes precendence over filename in "file" object)'),
'pattern': FILE_NAME_PATTERN,
'pattern_description': _('Numbers, letters and special caracters "%s" are allowed.')
% SPECIAL_CHARS,
},
'path': {
'type': 'string',
'description': _('File path'),
'pattern': FILE_PATH_PATTERN,
'pattern_description': _('Must include leading but not trailing slash.'),
},
'object_type': {
'type': 'string',
'description': _('CMIS object type'),
},
'properties': {
'type': 'object',
'title': _('CMIS properties (dictionary with string keys)'),
'additionalProperties': {'type': 'string'},
},
},
'required': ['file', 'path'],
'unflatten': True,
}
WATCH_SCHEMA = {
'type': 'object',
'title': _('Watch object'),
'properties': {
'object_id': {
'type': 'string',
'description': _('Object ID of file (can also be a path)'),
},
'callback_url': {'type': 'string', 'pattern': '^https?://'},
},
'required': ['object_id', 'callback_url'],
}
CHECK_OBJECT_SCHEMA = {
'type': 'object',
'title': _('Check object'),
'properties': {
'object_id': {
'type': 'string',
'description': _('Object ID of file (can also be a path)'),
},
},
'required': ['object_id'],
}
class CmisConnector(BaseResource):
cmis_endpoint = models.URLField(
max_length=400, verbose_name=_('CMIS Atom endpoint'), help_text=_('URL of the CMIS Atom endpoint')
)
username = models.CharField(max_length=128, verbose_name=_('Service username'))
password = models.CharField(max_length=128, verbose_name=_('Service password'))
category = _('File Storage')
class Meta:
verbose_name = _('CMIS connector')
def check_status(self):
with self.get_cmis_gateway() as cmis_gateway:
cmis_gateway.repo # pylint: disable=pointless-statement
@endpoint(
description=_('File upload'),
perm='can_access',
post={
'request_body': {
'schema': {
'application/json': UPLOAD_SCHEMA,
}
}
},
)
def uploadfile(self, request, post_data):
error, error_msg, data = self._validate_inputs(post_data)
if error:
self.logger.debug("received invalid data: %s" % error_msg)
raise APIError(error_msg, http_status=400)
filename = data.get('filename') or data['file']['filename']
self.logger.info("received file_name: '%s', file_path: '%s'", filename, data["path"])
with self.get_cmis_gateway() as cmis_gateway:
doc = cmis_gateway.create_doc(
filename,
data['path'],
data['file_byte_content'],
content_type=data['file'].get('content_type'),
object_type=data.get('object_type'),
properties=data.get('properties'),
)
return {'data': {'properties': doc.properties}}
@contextmanager
def get_cmis_gateway(self):
with ignore_loggers('cmislib', 'cmislib.atompub.binding'):
yield CMISGateway(self.cmis_endpoint, self.username, self.password, self.logger)
def _validate_inputs(self, data):
"""process dict
return a tuple (error, error_msg, data)
"""
file_ = data['file']
if 'filename' not in file_ and 'filename' not in data:
return True, '"filename" or "file[\'filename\']" is required', None
try:
data['file_byte_content'] = base64.b64decode(file_['content'])
except (TypeError, binascii.Error):
return True, '"file[\'content\']" must be a valid base64 string', None
return False, '', data
def _get_metadata(self, object_id):
with self.get_cmis_gateway() as cmis_gateway:
if '/' in object_id:
doc = cmis_gateway.get_object_by_path(object_id)
else:
doc = cmis_gateway.get_object(object_id)
metadata = {}
for key, value in doc.properties.items():
sub_metadata = metadata
for subkey in key.split(':')[:-1]:
if subkey not in sub_metadata:
sub_metadata[subkey] = {}
sub_metadata = sub_metadata[subkey]
sub_metadata[key.split(':')[-1]] = value
return metadata
@endpoint(
description=_('Get file'),
perm='can_access',
parameters={
'object_id': {
'description': _('Object ID of file (can also be a path)'),
}
},
)
def getfile(self, request, object_id):
with self.get_cmis_gateway() as cmis_gateway:
if '/' in object_id:
doc = cmis_gateway.get_object_by_path(object_id)
else:
doc = cmis_gateway.get_object(object_id)
try:
mime_type = doc.properties['cmis:contentStreamMimeType']
except KeyError:
mime_type = 'application/octet-stream'
bytes_io = doc.getContentStream()
return HttpResponse(bytes_io, content_type=mime_type)
@endpoint(
description=_('Get file metadata'),
perm='can_access',
parameters={
'object_id': {
'description': _('Object ID of file (can also be a path)'),
}
},
)
def getmetadata(self, request, object_id):
return {'data': self._get_metadata(object_id)}
@endpoint(
description=_('Watch object'),
perm='can_access',
post={
'request_body': {
'schema': {
'application/json': WATCH_SCHEMA,
}
}
},
)
def watch(self, request, post_data):
object_id = post_data['object_id']
metadata = self._get_metadata(object_id)
callback_url = post_data['callback_url']
if ObjectWatch.objects.filter(
resource=self, object_id=object_id, callback_url=callback_url, cancelled__isnull=False
).exists():
raise APIError('This file is already watched')
object_watch = ObjectWatch.objects.create(
resource=self, object_id=object_id, metadata=metadata, callback_url=callback_url
)
return {
'data': {
'object_id': object_id,
'watch_id': object_watch.id,
'metadata': metadata,
'callback_url': callback_url,
}
}
@endpoint(
description=_('Check object'),
name='check-object',
perm='can_access',
post={
'request_body': {
'schema': {
'application/json': CHECK_OBJECT_SCHEMA,
}
}
},
)
def check_object(self, request, post_data):
return {'data': self._check_objects(post_data['object_id'])}
def _check_objects(self, object_id=None):
res = []
qs = ObjectWatch.objects.filter(cancelled__isnull=True)
if object_id:
qs = qs.filter(object_id=object_id)
for object_watch in qs:
obj_res = {'object_id': object_watch.object_id, 'changed': True, 'callback_err': 0}
res.append(obj_res)
metadata = self._get_metadata(object_watch.object_id)
# simulate a back and forth to database, to get datetimes as strings
metadata = json.loads(JSONEncoder().encode(metadata))
# try to compare on lastModificationDate
new_mod_date = metadata.get('cmis', {}).get('lastModificationDate', '')
old_mod_date = object_watch.metadata.get('cmis', {}).get('lastModificationDate', '')
if new_mod_date and old_mod_date:
if new_mod_date == old_mod_date:
obj_res['changed'] = False
continue
# fallback on all the metadata
elif metadata and object_watch.metadata:
if metadata == object_watch.metadata:
obj_res['changed'] = False
continue
# metadata changed, call the callback url
try:
resp = self.requests.post(object_watch.callback_url, json=metadata)
except (requests.Timeout, requests.RequestException) as e:
obj_res['callback_err'] = 1
obj_res['callback_err_desc'] = str(e)
else:
try:
resp.raise_for_status()
except requests.RequestException as e:
obj_res['callback_err'] = 1
obj_res['callback_err_desc'] = str(e)
else:
object_watch.cancelled = timezone.now()
object_watch.save()
return res
def daily(self):
self._check_objects()
class ObjectWatch(models.Model):
resource = models.ForeignKey(CmisConnector, on_delete=models.CASCADE)
object_id = models.CharField(max_length=128, db_index=True)
callback_url = models.URLField(max_length=400, verbose_name=_('Callback URL'), db_index=True)
metadata = JSONField(encoder=JSONEncoder)
created = models.DateTimeField(auto_now_add=True)
cancelled = models.DateTimeField(null=True)
def wrap_cmis_error(f):
@functools.wraps(f)
def wrapper(*args, **kwargs):
try:
return f(*args, **kwargs)
except (urllib2.URLError, httplib2.HttpLib2Error) as e:
# FIXME urllib2 still used for cmslib 0.5 compat
raise APIError("connection error: %s" % e)
except PermissionDeniedException as e:
raise APIError("permission denied: %s" % e)
except UpdateConflictException as e:
raise APIError("update conflict: %s" % e)
except InvalidArgumentException as e:
raise APIError("invalid property name: %s" % e)
except CmisException as e:
raise APIError("cmis binding error: %s" % e)
return wrapper
class CMISGateway:
def __init__(self, cmis_endpoint, username, password, logger):
self._cmis_client = CmisClient(cmis_endpoint, username, password, passerelle_logger=logger)
self._logger = logger
@cached_property
def repo(self):
return self._cmis_client.defaultRepository
def _get_or_create_folder(self, file_path):
try:
self._logger.debug("searching '%s'" % file_path)
res = self.repo.getObjectByPath(file_path)
self._logger.debug("'%s' found" % file_path)
return res
except ObjectNotFoundException:
self._logger.debug("'%s' not found" % file_path)
basepath = ""
folder = self.repo.rootFolder
for path_part in file_path.strip('/').split('/'):
basepath += '/%s' % path_part
try:
self._logger.debug("searching '%s'" % basepath)
folder = self.repo.getObjectByPath(basepath)
self._logger.debug("'%s' found" % basepath)
except ObjectNotFoundException:
self._logger.debug("'%s' not found" % basepath)
folder = folder.createFolder(path_part)
self._logger.debug("create folder '%s'" % basepath)
return folder
@wrap_cmis_error
def create_doc(
self, file_name, file_path, file_byte_content, content_type=None, object_type=None, properties=None
):
folder = self._get_or_create_folder(file_path)
properties = properties or {}
if object_type:
properties['cmis:objectTypeId'] = object_type
return folder.createDocument(
file_name, contentFile=BytesIO(file_byte_content), contentType=content_type, properties=properties
)
@wrap_cmis_error
def get_object_by_path(self, file_path):
return self.repo.getObjectByPath(file_path)
@wrap_cmis_error
def get_object(self, object_id):
return self.repo.getObject(object_id)