# passerelle - uniform access to multiple data sources and services # Copyright (C) 2021 Entr'ouvert # # This program is free software: you can redistribute it and/or modify it # under the terms of the GNU Affero General Public License as published # by the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . from urllib.parse import parse_qsl, urlsplit, urlunsplit from django.core.cache import cache from django.db import models from django.shortcuts import get_object_or_404 from django.urls import reverse from django.utils.translation import ugettext_lazy as _ from requests import RequestException from passerelle.base.models import BaseQuery, BaseResource from passerelle.compat import json_loads from passerelle.utils.api import endpoint from passerelle.utils.http_authenticators import HttpBearerAuth from passerelle.utils.json import unflatten from passerelle.utils.jsonresponse import APIError from passerelle.utils.templates import render_to_string, validate_template class ParameterTypeError(Exception): http_status = 400 log_error = False class PloneRestApi(BaseResource): service_url = models.CharField( _('Site URL'), max_length=256, blank=False, help_text=_('ex: https://demo.plone.org'), ) token_ws_url = models.CharField( _('Token webservice URL'), max_length=256, blank=True, help_text=_('ex: https://IDP/idp/oidc/token/ or unset for anonymous acces'), ) client_id = models.CharField( _('OIDC id'), max_length=128, blank=True, help_text=_('OIDC id of the connector'), ) client_secret = models.CharField( _('Shared secret'), max_length=128, blank=True, help_text=_('Share secret secret for webservice call authentication'), ) username = models.CharField(_('Username'), max_length=128, blank=True) password = models.CharField(_('Password'), max_length=128, blank=True) category = _('Data Sources') plone_keys_to_rename = ['@id', '@type', '@components'] class Meta: verbose_name = _('Plone REST API Web Service') def export_json(self): data = super(PloneRestApi, self).export_json() data['queries'] = [query.export_json() for query in self.queries.all()] return data @classmethod def import_json_real(cls, overwrite, instance, data, **kwargs): data_queries = data.pop('queries', []) instance = super(PloneRestApi, cls).import_json_real(overwrite, instance, data, **kwargs) queries = [] if instance and overwrite: Query.objects.filter(resource=instance).delete() for data_query in data_queries: query = Query.import_json(data_query) query.resource = instance queries.append(query) Query.objects.bulk_create(queries) return instance def adapt_id_and_type_plone_attributes(self, data): """Rename keys starting with '@' from plone response ex: '@id' is renammed into 'PLONE_id'""" if isinstance(data, list): for value in list(data): self.adapt_id_and_type_plone_attributes(value) elif isinstance(data, dict): for key, value in list(data.items()): self.adapt_id_and_type_plone_attributes(value) if key in self.plone_keys_to_rename and key[0] == '@': data['PLONE_%s' % key[1:]] = value del data[key] def adapt_payload(self, payload): # convert image format for file_field in payload.values(): if isinstance(file_field, dict) and file_field.get('filename'): file_field['encoding'] = 'base64' file_field['data'] = file_field['content'] file_field['content-type'] = file_field['content_type'] del file_field['content'] def adapt_record( self, record, text_template='{{ id }}', id_key='UID', ): self.adapt_id_and_type_plone_attributes(record) for key, value in list(record.items()): # backup original id and text fields if key in ('id', 'text'): key = 'original_%s' % key record[key] = value record['id'] = record.get(id_key) record['text'] = render_to_string(text_template, record).strip() def get_token(self, renew=False): token_key = 'plone-restapi-%s-token' % self.id if not renew and cache.get(token_key): return cache.get(token_key) payload = { 'grant_type': 'password', 'client_id': str(self.client_id), 'client_secret': str(self.client_secret), 'username': self.username, 'password': self.password, 'scope': ['openid'], } headers = { 'Content-Type': 'application/x-www-form-urlencoded', } response = self.requests.post(self.token_ws_url, headers=headers, data=payload) if not response.status_code // 100 == 2: raise APIError(response.content) token = response.json().get('id_token') cache.set(token_key, token, 30) return token def request(self, uri='', uid='', method='GET', params=None, json=None): scheme, netloc, path, query, fragment = urlsplit(self.service_url) if uri: path += '/%s' % uri if uid: path += '/%s' % uid url = urlunsplit((scheme, netloc, path, '', fragment)) headers = {'Accept': 'application/json'} auth = HttpBearerAuth(self.get_token()) if self.token_ws_url else None try: response = self.requests.request( method=method, url=url, headers=headers, params=params, json=json, auth=auth ) except RequestException as e: raise APIError('PloneRestApi: %s' % e) json_response = None if response.status_code != 204: # No Content try: json_response = response.json() except ValueError as e: raise APIError('PloneRestApi: bad JSON response') try: response.raise_for_status() except RequestException as e: raise APIError('PloneRestApi: %s "%s"' % (e, json_response)) return json_response def call_search( self, uri='', text_template='', filter_expression='', sort=None, order=True, limit=None, id=None, q=None, ): query = urlsplit(self.service_url).query params = dict(parse_qsl(query)) if id: params['UID'] = id else: if q is not None: params['SearchableText'] = q if sort: params['sort_on'] = sort if order: params['sort_order'] = 'ascending' else: params['sort_order'] = 'descending' if limit: params['b_size'] = limit params.update(parse_qsl(filter_expression)) params['fullobjects'] = 'y' response = self.request(uri=uri, uid='@search', method='GET', params=params) for record in response.get('items') or []: self.adapt_record(record, text_template) return response.get('items') or [] @endpoint( perm='can_access', description=_('Get content types'), display_order=1, ) def get_content_types(self, request): response = self.request(uri='@types', method='GET') for record in response or []: self.adapt_record(record, '{{ title }}', id_key='PLONE_id') record['id'] = record['id'].split('/')[-1] return {'data': response or []} @endpoint( perm='can_access', description=_('Get content type'), parameters={ 'id': {'description': _('Content type identifier'), 'example_value': 'imio.directory.Contact'} }, display_order=2, ) def get_content_type(self, request, id): response = self.request(uri='@types', uid=id, method='GET') return {'data': response} @endpoint( perm='can_access', description=_('Get field choices'), parameters={ 'id': {'description': _('Field identifier'), 'example_value': 'imio.smartweb.vocabulary.Topics'} }, display_order=3, ) def get_field_choices(self, request, id): response = self.request(uri='@vocabularies', uid=id, method='GET') for record in response.get('items') or []: self.adapt_record(record, '{{ title }}', id_key='token') return {'data': response.get('items') or []} @endpoint( perm='can_access', description=_('Fetch'), parameters={ 'uri': {'description': _('Uri')}, 'uid': {'description': _('Uid')}, 'text_template': {'description': _('Text template')}, }, display_order=4, ) def fetch(self, request, uid, uri='', text_template=''): response = self.request(uri=uri, uid=uid, method='GET') self.adapt_record(response, text_template) return {'data': response} @endpoint( perm='can_access', description=_('Creates'), parameters={ 'uri': {'description': _('Uri')}, 'publish': {'description': _('Do publish content (default is false)')}, }, methods=['post'], display_order=5, ) def create(self, request, uri, publish=False): try: post_data = json_loads(request.body) except ValueError as e: raise ParameterTypeError(str(e)) post_data = unflatten(post_data) self.adapt_payload(post_data) response = self.request(uri=uri, method='POST', json=post_data) uid = response.get('UID') review_state = None if uid and bool(publish): uri += '/%s' % uid response = self.request(uri, uid='@workflow/publish', method='POST') review_state = response.get('review_state') return {'data': {'uid': uid, 'created': True, 'review_state': review_state}} @endpoint( perm='can_access', description=_('Update'), parameters={ 'uri': {'description': _('Uri')}, 'uid': {'description': _('Uid')}, }, methods=['post'], display_order=6, ) def update(self, request, uid, uri=''): try: post_data = json_loads(request.body) except ValueError as e: raise ParameterTypeError(str(e)) post_data = unflatten(post_data) self.adapt_payload(post_data) self.request(uri=uri, uid=uid, method='PATCH', json=post_data) return {'data': {'uid': uid, 'updated': True}} @endpoint( perm='can_access', description=_('Remove'), parameters={ 'uri': {'description': _('Uri')}, 'uid': {'description': _('Uid')}, }, methods=['delete'], display_order=7, ) def remove(self, request, uid, uri=''): self.request(method='DELETE', uri=uri, uid=uid) return {'data': {'uid': uid, 'removed': True}} @endpoint( perm='can_access', description=_('Search'), parameters={ 'uri': {'description': _('Uri')}, 'text_template': {'description': _('Text template')}, 'sort': {'description': _('Sort field')}, 'order': {'description': _('Ascending sort order'), 'type': 'bool'}, 'limit': {'description': _('Maximum items')}, 'id': {'description': _('Record identifier')}, 'q': {'description': _('Full text query')}, }, ) def search( self, request, uri='', text_template='', sort=None, order=True, limit=None, id=None, q=None, **kwargs, ): result = self.call_search(uri, text_template, '', sort, order, limit, id, q) return {'data': result} @endpoint( name='q', description=_('Query'), pattern=r'^(?P[\w:_-]+)/$', perm='can_access', show=False, ) def q(self, request, query_slug, **kwargs): query = get_object_or_404(Query, resource=self, slug=query_slug) result = query.q(request, **kwargs) meta = {'label': query.name, 'description': query.description} return {'data': result, 'meta': meta} def create_query_url(self): return reverse('plone-restapi-query-new', kwargs={'slug': self.slug}) class Query(BaseQuery): resource = models.ForeignKey( to=PloneRestApi, related_name='queries', verbose_name=_('Resource'), on_delete=models.CASCADE ) uri = models.CharField( verbose_name=_('Uri'), max_length=128, help_text=_('uri to query'), blank=True, ) text_template = models.TextField( verbose_name=_('Text template'), help_text=_("Use Django's template syntax. Attributes can be accessed through {{ attributes.name }}"), validators=[validate_template], blank=True, ) filter_expression = models.TextField( verbose_name=_('filter'), help_text=_('Specify more URL parameters (key=value) separated by lines'), blank=True, ) sort = models.CharField( verbose_name=_('Sort field'), help_text=_('Sorts results by the specified field'), max_length=256, blank=True, ) order = models.BooleanField( verbose_name=_('Ascending sort order'), help_text=_("Unset to use descending sort order"), default=True, ) limit = models.PositiveIntegerField( default=10, verbose_name=_('Limit'), help_text=_('Number of results to return in a single call'), ) delete_view = 'plone-restapi-query-delete' edit_view = 'plone-restapi-query-edit' def q(self, request, **kwargs): return self.resource.call_search( uri=self.uri, text_template=self.text_template, filter_expression='&'.join( [x.strip() for x in str(self.filter_expression).splitlines() if x.strip()] ), sort=self.sort, order=self.order, limit=self.limit, id=kwargs.get('id'), q=kwargs.get('q'), ) def as_endpoint(self): endpoint = super(Query, self).as_endpoint(path=self.resource.q.endpoint_info.name) search_endpoint = self.resource.search.endpoint_info endpoint.func = search_endpoint.func endpoint.show_undocumented_params = False # Copy generic params descriptions from original endpoint # if they are not overloaded by the query for param in search_endpoint.parameters: if param in ('uri', 'text_template') and getattr(self, param): continue endpoint.parameters[param] = search_endpoint.parameters[param] return endpoint