combo/combo/apps/wcs/views.py

138 lines
5.1 KiB
Python

# combo - content management system
# Copyright (C) 2014-2018 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import re
from django.contrib import messages
from django.conf import settings
from django.core.exceptions import DisallowedRedirect, PermissionDenied
from django.core.urlresolvers import reverse
from django.http import JsonResponse, HttpResponseRedirect, HttpResponseBadRequest
from django.utils.http import urlquote
from django.utils.six.moves.urllib import parse as urlparse
from django.utils.translation import ugettext_lazy as _
from django.views.decorators.csrf import csrf_exempt
from django.views.generic import View
import ratelimit.utils
from .models import TrackingCodeInputCell
from .utils import get_wcs_services
from combo.utils import requests
from combo.utils.misc import is_url_from_known_service
class TrackingCodeView(View):
http_method_names = ['post']
@csrf_exempt
def dispatch(self, *args, **kwargs):
# CSRF check must be disabled as the cell may be distributed to other
# sites in a skeleton.
return super(TrackingCodeView, self).dispatch(*args, **kwargs)
@classmethod
def search(self, code, request, wcs_site=None):
code = code.strip().upper()
if wcs_site:
wcs_sites = [get_wcs_services().get(wcs_site)]
else:
wcs_sites = get_wcs_services().values()
rate_limit_option = settings.WCS_TRACKING_CODE_RATE_LIMIT
if rate_limit_option and rate_limit_option != 'none':
for rate_limit in rate_limit_option.split():
ratelimited = ratelimit.utils.is_ratelimited(
request=request,
group='trackingcode',
key='ip',
rate=rate_limit,
increment=True)
if ratelimited:
raise PermissionDenied('rate limit reached (%s)' % rate_limit)
for wcs_site in wcs_sites:
response = requests.get('/api/code/' + urlquote(code),
remote_service=wcs_site, log_errors=False)
if response.status_code == 200 and response.json().get('err') == 0:
return response.json().get('load_url')
return None
def post(self, request, *args, **kwargs):
try:
cell = TrackingCodeInputCell.objects.get(id=request.POST['cell'])
except (KeyError, ValueError, TrackingCodeInputCell.DoesNotExist):
return HttpResponseBadRequest('Invalid cell id')
if not 'code' in request.POST:
return HttpResponseBadRequest('Missing code')
code = request.POST['code']
next_url = request.POST.get('url') or '/'
next_netloc = urlparse.urlparse(next_url).netloc
redirect_to_other_domain = bool(
next_netloc and next_netloc != urlparse.urlparse(request.build_absolute_uri()).netloc)
if redirect_to_other_domain and not is_url_from_known_service(next_url):
raise DisallowedRedirect('Unsafe redirect to unknown host')
try:
url = self.search(code, request, wcs_site=cell.wcs_site)
except PermissionDenied:
if redirect_to_other_domain:
raise
else:
messages.error(self.request,
_(u'Looking up tracking code is currently rate limited.'))
else:
if url:
return HttpResponseRedirect(url)
if redirect_to_other_domain:
if '?' in next_url:
next_url += '&'
else:
next_url += '?'
next_url += 'unknown-tracking-code'
else:
messages.error(self.request,
_(u'The tracking code could not been found.'))
return HttpResponseRedirect(next_url)
def tracking_code_search(request):
hits = []
response = {'data': hits, 'err': 0}
query = request.GET.get('q') or ''
query = query.strip().upper()
if re.match(r'^[BCDFGHJKLMNPQRSTVWXZ]{8}$', query):
try:
url = TrackingCodeView.search(query, request)
except PermissionDenied:
response['err'] = 1
hits.append({
'text': _('Looking up tracking code is currently rate limited.'),
'url': '#',
})
else:
if url:
hits.append({
'text': _('Use tracking code %s') % query,
'url': url,
})
return JsonResponse(response)