# combo - content management system # Copyright (C) 2014-2018 Entr'ouvert # # This program is free software: you can redistribute it and/or modify it # under the terms of the GNU Affero General Public License as published # by the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import re import urllib.parse from urllib.parse import quote import ratelimit.utils from django.conf import settings from django.contrib import messages from django.core.exceptions import DisallowedRedirect, PermissionDenied from django.http import HttpResponseBadRequest, HttpResponseForbidden, HttpResponseRedirect, JsonResponse from django.utils.translation import gettext_lazy as _ from django.views.decorators.csrf import csrf_exempt from django.views.generic import View from combo.utils import DecryptionError, aes_hex_decrypt, requests, sign_url from combo.utils.misc import get_known_service_for_url, is_url_from_known_service from .models import TrackingCodeInputCell from .utils import get_wcs_services class TrackingCodeView(View): http_method_names = ['post'] @csrf_exempt def dispatch(self, *args, **kwargs): # CSRF check must be disabled as the cell may be distributed to other # sites in a skeleton. return super().dispatch(*args, **kwargs) @classmethod def search(cls, code, request, wcs_site=None): code = code.strip().upper() if wcs_site: wcs_sites = [get_wcs_services().get(wcs_site)] else: wcs_sites = get_wcs_services().values() rate_limit_option = settings.WCS_TRACKING_CODE_RATE_LIMIT if rate_limit_option and rate_limit_option != 'none': for rate_limit in rate_limit_option.split(): ratelimited = ratelimit.utils.is_ratelimited( request=request, group='trackingcode', key='ip', rate=rate_limit, increment=True ) if ratelimited: raise PermissionDenied('rate limit reached (%s)' % rate_limit) for wcs_site in wcs_sites: if not wcs_site: continue response = requests.get('/api/code/' + quote(code), remote_service=wcs_site, log_errors=False) if response.status_code == 200 and response.json().get('err') == 0: return response.json().get('load_url') return None def post(self, request, *args, **kwargs): try: cell = TrackingCodeInputCell.objects.get(id=request.POST['cell']) except (KeyError, ValueError, TrackingCodeInputCell.DoesNotExist): return HttpResponseBadRequest('Invalid cell id') if 'code' not in request.POST: return HttpResponseBadRequest('Missing code') code = request.POST['code'] next_url = request.POST.get('url') or '/' next_netloc = urllib.parse.urlparse(next_url).netloc redirect_to_other_domain = bool( next_netloc and next_netloc != urllib.parse.urlparse(request.build_absolute_uri()).netloc ) if redirect_to_other_domain and not is_url_from_known_service(next_url): raise DisallowedRedirect('Unsafe redirect to unknown host') try: url = self.search(code, request, wcs_site=cell.wcs_site) except PermissionDenied: if redirect_to_other_domain: raise messages.error(self.request, _('Looking up tracking code is currently rate limited.')) else: if url: return HttpResponseRedirect(url) if redirect_to_other_domain: if '?' in next_url: next_url += '&' else: next_url += '?' next_url += 'unknown-tracking-code' else: messages.error(self.request, _('The tracking code could not been found.')) return HttpResponseRedirect(next_url) def tracking_code_search(request): hits = [] response = {'data': hits, 'err': 0} query = request.GET.get('q') or '' query = query.strip().upper() if re.match(r'^[BCDFGHJKLMNPQRSTVWXZ]{8}$', query): try: url = TrackingCodeView.search(query, request) except PermissionDenied: response['err'] = 1 hits.append( { 'text': _('Looking up tracking code is currently rate limited.'), 'url': '#', } ) else: if url: hits.append( { 'text': _('Use tracking code %s') % query, 'url': url, } ) return JsonResponse(response) def redirect_crypto_url(request, session_key, crypto_url): if session_key != request.session.session_key: return HttpResponseForbidden() try: real_url = aes_hex_decrypt(settings.SECRET_KEY, crypto_url) except DecryptionError: return HttpResponseForbidden('invalid crypto url') service = get_known_service_for_url(real_url) if '?' not in real_url: real_url += '?' real_url += '&orig=%s' % service['orig'] redirect_url = sign_url(real_url, service['secret']) return HttpResponseRedirect(redirect_url)