misc: add rate limiting to tracking code URLs (#35395)

This commit is contained in:
Frédéric Péters 2019-08-14 09:40:13 +02:00
parent 681c81c049
commit 4836ed82d8
6 changed files with 99 additions and 20 deletions

View File

@ -17,6 +17,8 @@
import re
from django.contrib import messages
from django.conf import settings
from django.core.exceptions import PermissionDenied
from django.core.urlresolvers import reverse
from django.http import JsonResponse, HttpResponseRedirect, HttpResponseBadRequest
from django.utils.http import urlquote
@ -25,6 +27,8 @@ from django.utils.translation import ugettext_lazy as _
from django.views.decorators.csrf import csrf_exempt
from django.views.generic import View
import ratelimit.utils
from .models import TrackingCodeInputCell
from .utils import get_wcs_services
@ -41,13 +45,25 @@ class TrackingCodeView(View):
return super(TrackingCodeView, self).dispatch(*args, **kwargs)
@classmethod
def search(self, code, wcs_site=None):
def search(self, code, request, wcs_site=None):
code = code.strip().upper()
if wcs_site:
wcs_sites = [get_wcs_services().get(wcs_site)]
else:
wcs_sites = get_wcs_services().values()
rate_limit_option = settings.WCS_TRACKING_CODE_RATE_LIMIT
if rate_limit_option and rate_limit_option != 'none':
for rate_limit in rate_limit_option.split():
ratelimited = ratelimit.utils.is_ratelimited(
request=request,
group='trackingcode',
key='ip',
rate=rate_limit,
increment=True)
if ratelimited:
raise PermissionDenied('rate limit reached (%s)' % rate_limit)
for wcs_site in wcs_sites:
response = requests.get('/api/code/' + urlquote(code),
remote_service=wcs_site, log_errors=False)
@ -65,34 +81,53 @@ class TrackingCodeView(View):
return HttpResponseBadRequest('Missing code')
code = request.POST['code']
url = self.search(code, wcs_site=cell.wcs_site)
if url:
return HttpResponseRedirect(url)
next_url = request.POST.get('url') or '/'
next_netloc = urlparse.urlparse(next_url).netloc
if not (next_netloc and next_netloc != urlparse.urlparse(request.build_absolute_uri()).netloc):
messages.error(self.request,
_(u'The tracking code could not been found.'))
else:
if '?' in next_url:
next_url += '&'
redirect_to_other_domain = bool(
next_netloc and next_netloc != urlparse.urlparse(request.build_absolute_uri()).netloc)
try:
url = self.search(code, request, wcs_site=cell.wcs_site)
except PermissionDenied:
if redirect_to_other_domain:
raise
else:
next_url += '?'
next_url += 'unknown-tracking-code'
messages.error(self.request,
_(u'Looking up tracking code is currently rate limited.'))
else:
if url:
return HttpResponseRedirect(url)
if redirect_to_other_domain:
if '?' in next_url:
next_url += '&'
else:
next_url += '?'
next_url += 'unknown-tracking-code'
else:
messages.error(self.request,
_(u'The tracking code could not been found.'))
return HttpResponseRedirect(next_url)
def tracking_code_search(request):
hits = []
response = {'data': hits, 'err': 0}
query = request.GET.get('q') or ''
query = query.strip().upper()
if re.match(r'^[BCDFGHJKLMNPQRSTVWXZ]{8}$', query):
url = TrackingCodeView.search(query)
if url:
try:
url = TrackingCodeView.search(query, request)
except PermissionDenied:
response['err'] = 1
hits.append({
'text': _('Use tracking code %s') % query,
'url': url,
'text': _('Looking up tracking code is currently rate limited.'),
'url': '#',
})
return JsonResponse({'data': hits})
else:
if url:
hits.append({
'text': _('Use tracking code %s') % query,
'url': url,
})
return JsonResponse(response)

View File

@ -310,6 +310,9 @@ REQUESTS_TIMEOUT = 28
# default duration of notifications (in days)
COMBO_DEFAULT_NOTIFICATION_DURATION = 3
# tracking code throttling
WCS_TRACKING_CODE_RATE_LIMIT = '3/s 1500/d'
# predefined slots for assets
# example: {'banner': {'label': 'Banner image'}}
COMBO_ASSET_SLOTS = {}

1
debian/control vendored
View File

@ -22,6 +22,7 @@ Depends: ${misc:Depends}, ${python:Depends},
python-xstatic-roboto-fontface (<< 0.5.0.0),
python-eopayment (>= 1.35),
python-django-haystack (>= 2.4.0),
python-django-ratelimit,
python-sorl-thumbnail,
python-pil,
python-pywebpush,

View File

@ -161,6 +161,7 @@ setup(
'python-dateutil',
'djangorestframework>=3.3, <3.7',
'django-haystack',
'django-ratelimit<3',
'whoosh',
'sorl-thumbnail',
'Pillow',

View File

@ -50,3 +50,12 @@ def admin_user():
except User.DoesNotExist:
user = User.objects.create_superuser('admin', email=None, password='admin')
return user
@pytest.fixture
def nocache(settings):
settings.CACHES = {
'default': {
'BACKEND': 'django.core.cache.backends.dummy.DummyCache',
}
}

View File

@ -611,7 +611,7 @@ def test_manager_current_forms(app, admin_user):
@wcs_present
def test_tracking_code_cell(app):
def test_tracking_code_cell(app, nocache):
Page.objects.all().delete()
page = Page(title='One', slug='index', template_name='standard')
page.save()
@ -713,8 +713,9 @@ def test_cell_assets(app, admin_user):
assert u'>Picture — form title (test)<' in resp.text
@wcs_present
def test_tracking_code_search(app):
def test_tracking_code_search(app, nocache):
assert len(app.get('/api/search/tracking-code/').json.get('data')) == 0
assert app.get('/api/search/tracking-code/').json.get('err') == 0
assert len(app.get('/api/search/tracking-code/?q=123').json.get('data')) == 0
assert len(app.get('/api/search/tracking-code/?q=BBCCDFF').json.get('data')) == 0
assert len(app.get('/api/search/tracking-code/?q=BBCCDDFF').json.get('data')) == 0
@ -722,6 +723,35 @@ def test_tracking_code_search(app):
assert len(app.get('/api/search/tracking-code/?q=BBCCDDFFG').json.get('data')) == 0
assert len(app.get('/api/search/tracking-code/?q= cnphntfb').json.get('data')) == 1
@wcs_present
def test_tracking_code_search_rate_limit(app):
for i in range(3):
assert app.get('/api/search/tracking-code/?q=BBCCDDFF').json.get('err') == 0
assert app.get('/api/search/tracking-code/?q=BBCCDDFF').json.get('err') == 1
Page.objects.all().delete()
page = Page(title='One', slug='index', template_name='standard')
page.save()
cell = TrackingCodeInputCell(page=page, placeholder='content', order=0)
cell.save()
resp = app.get('/')
for i in range(3): # make sure we hit ratelimit
app.get('/api/search/tracking-code/?q=BBCCDDFF')
resp.form['code'] = 'FOOBAR'
resp = resp.form.submit()
assert resp.status_code == 302
resp = resp.follow()
assert '<li class="error">Looking up tracking code is currently rate limited.</li>' in resp.text
resp = app.get('/')
for i in range(3): # make sure we hit ratelimit
app.get('/api/search/tracking-code/?q=BBCCDDFF')
resp.form['code'] = 'FOOBAR'
resp.form['url'] = 'http://example.net/'
resp = resp.form.submit(status=403)
@wcs_present
def test_wcs_search_engines(app):
search_engines = engines.get_engines()