misc: add rate limiting to tracking code URLs (#35395)
This commit is contained in:
parent
681c81c049
commit
4836ed82d8
|
@ -17,6 +17,8 @@
|
|||
import re
|
||||
|
||||
from django.contrib import messages
|
||||
from django.conf import settings
|
||||
from django.core.exceptions import PermissionDenied
|
||||
from django.core.urlresolvers import reverse
|
||||
from django.http import JsonResponse, HttpResponseRedirect, HttpResponseBadRequest
|
||||
from django.utils.http import urlquote
|
||||
|
@ -25,6 +27,8 @@ from django.utils.translation import ugettext_lazy as _
|
|||
from django.views.decorators.csrf import csrf_exempt
|
||||
from django.views.generic import View
|
||||
|
||||
import ratelimit.utils
|
||||
|
||||
from .models import TrackingCodeInputCell
|
||||
from .utils import get_wcs_services
|
||||
|
||||
|
@ -41,13 +45,25 @@ class TrackingCodeView(View):
|
|||
return super(TrackingCodeView, self).dispatch(*args, **kwargs)
|
||||
|
||||
@classmethod
|
||||
def search(self, code, wcs_site=None):
|
||||
def search(self, code, request, wcs_site=None):
|
||||
code = code.strip().upper()
|
||||
if wcs_site:
|
||||
wcs_sites = [get_wcs_services().get(wcs_site)]
|
||||
else:
|
||||
wcs_sites = get_wcs_services().values()
|
||||
|
||||
rate_limit_option = settings.WCS_TRACKING_CODE_RATE_LIMIT
|
||||
if rate_limit_option and rate_limit_option != 'none':
|
||||
for rate_limit in rate_limit_option.split():
|
||||
ratelimited = ratelimit.utils.is_ratelimited(
|
||||
request=request,
|
||||
group='trackingcode',
|
||||
key='ip',
|
||||
rate=rate_limit,
|
||||
increment=True)
|
||||
if ratelimited:
|
||||
raise PermissionDenied('rate limit reached (%s)' % rate_limit)
|
||||
|
||||
for wcs_site in wcs_sites:
|
||||
response = requests.get('/api/code/' + urlquote(code),
|
||||
remote_service=wcs_site, log_errors=False)
|
||||
|
@ -65,34 +81,53 @@ class TrackingCodeView(View):
|
|||
return HttpResponseBadRequest('Missing code')
|
||||
code = request.POST['code']
|
||||
|
||||
url = self.search(code, wcs_site=cell.wcs_site)
|
||||
if url:
|
||||
return HttpResponseRedirect(url)
|
||||
|
||||
next_url = request.POST.get('url') or '/'
|
||||
next_netloc = urlparse.urlparse(next_url).netloc
|
||||
if not (next_netloc and next_netloc != urlparse.urlparse(request.build_absolute_uri()).netloc):
|
||||
messages.error(self.request,
|
||||
_(u'The tracking code could not been found.'))
|
||||
else:
|
||||
if '?' in next_url:
|
||||
next_url += '&'
|
||||
redirect_to_other_domain = bool(
|
||||
next_netloc and next_netloc != urlparse.urlparse(request.build_absolute_uri()).netloc)
|
||||
|
||||
try:
|
||||
url = self.search(code, request, wcs_site=cell.wcs_site)
|
||||
except PermissionDenied:
|
||||
if redirect_to_other_domain:
|
||||
raise
|
||||
else:
|
||||
next_url += '?'
|
||||
next_url += 'unknown-tracking-code'
|
||||
messages.error(self.request,
|
||||
_(u'Looking up tracking code is currently rate limited.'))
|
||||
else:
|
||||
if url:
|
||||
return HttpResponseRedirect(url)
|
||||
if redirect_to_other_domain:
|
||||
if '?' in next_url:
|
||||
next_url += '&'
|
||||
else:
|
||||
next_url += '?'
|
||||
next_url += 'unknown-tracking-code'
|
||||
else:
|
||||
messages.error(self.request,
|
||||
_(u'The tracking code could not been found.'))
|
||||
|
||||
return HttpResponseRedirect(next_url)
|
||||
|
||||
|
||||
def tracking_code_search(request):
|
||||
hits = []
|
||||
response = {'data': hits, 'err': 0}
|
||||
query = request.GET.get('q') or ''
|
||||
query = query.strip().upper()
|
||||
if re.match(r'^[BCDFGHJKLMNPQRSTVWXZ]{8}$', query):
|
||||
url = TrackingCodeView.search(query)
|
||||
if url:
|
||||
try:
|
||||
url = TrackingCodeView.search(query, request)
|
||||
except PermissionDenied:
|
||||
response['err'] = 1
|
||||
hits.append({
|
||||
'text': _('Use tracking code %s') % query,
|
||||
'url': url,
|
||||
'text': _('Looking up tracking code is currently rate limited.'),
|
||||
'url': '#',
|
||||
})
|
||||
return JsonResponse({'data': hits})
|
||||
else:
|
||||
if url:
|
||||
hits.append({
|
||||
'text': _('Use tracking code %s') % query,
|
||||
'url': url,
|
||||
})
|
||||
return JsonResponse(response)
|
||||
|
|
|
@ -310,6 +310,9 @@ REQUESTS_TIMEOUT = 28
|
|||
# default duration of notifications (in days)
|
||||
COMBO_DEFAULT_NOTIFICATION_DURATION = 3
|
||||
|
||||
# tracking code throttling
|
||||
WCS_TRACKING_CODE_RATE_LIMIT = '3/s 1500/d'
|
||||
|
||||
# predefined slots for assets
|
||||
# example: {'banner': {'label': 'Banner image'}}
|
||||
COMBO_ASSET_SLOTS = {}
|
||||
|
|
|
@ -22,6 +22,7 @@ Depends: ${misc:Depends}, ${python:Depends},
|
|||
python-xstatic-roboto-fontface (<< 0.5.0.0),
|
||||
python-eopayment (>= 1.35),
|
||||
python-django-haystack (>= 2.4.0),
|
||||
python-django-ratelimit,
|
||||
python-sorl-thumbnail,
|
||||
python-pil,
|
||||
python-pywebpush,
|
||||
|
|
1
setup.py
1
setup.py
|
@ -161,6 +161,7 @@ setup(
|
|||
'python-dateutil',
|
||||
'djangorestframework>=3.3, <3.7',
|
||||
'django-haystack',
|
||||
'django-ratelimit<3',
|
||||
'whoosh',
|
||||
'sorl-thumbnail',
|
||||
'Pillow',
|
||||
|
|
|
@ -50,3 +50,12 @@ def admin_user():
|
|||
except User.DoesNotExist:
|
||||
user = User.objects.create_superuser('admin', email=None, password='admin')
|
||||
return user
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def nocache(settings):
|
||||
settings.CACHES = {
|
||||
'default': {
|
||||
'BACKEND': 'django.core.cache.backends.dummy.DummyCache',
|
||||
}
|
||||
}
|
||||
|
|
|
@ -611,7 +611,7 @@ def test_manager_current_forms(app, admin_user):
|
|||
|
||||
|
||||
@wcs_present
|
||||
def test_tracking_code_cell(app):
|
||||
def test_tracking_code_cell(app, nocache):
|
||||
Page.objects.all().delete()
|
||||
page = Page(title='One', slug='index', template_name='standard')
|
||||
page.save()
|
||||
|
@ -713,8 +713,9 @@ def test_cell_assets(app, admin_user):
|
|||
assert u'>Picture — form title (test)<' in resp.text
|
||||
|
||||
@wcs_present
|
||||
def test_tracking_code_search(app):
|
||||
def test_tracking_code_search(app, nocache):
|
||||
assert len(app.get('/api/search/tracking-code/').json.get('data')) == 0
|
||||
assert app.get('/api/search/tracking-code/').json.get('err') == 0
|
||||
assert len(app.get('/api/search/tracking-code/?q=123').json.get('data')) == 0
|
||||
assert len(app.get('/api/search/tracking-code/?q=BBCCDFF').json.get('data')) == 0
|
||||
assert len(app.get('/api/search/tracking-code/?q=BBCCDDFF').json.get('data')) == 0
|
||||
|
@ -722,6 +723,35 @@ def test_tracking_code_search(app):
|
|||
assert len(app.get('/api/search/tracking-code/?q=BBCCDDFFG').json.get('data')) == 0
|
||||
assert len(app.get('/api/search/tracking-code/?q= cnphntfb').json.get('data')) == 1
|
||||
|
||||
@wcs_present
|
||||
def test_tracking_code_search_rate_limit(app):
|
||||
for i in range(3):
|
||||
assert app.get('/api/search/tracking-code/?q=BBCCDDFF').json.get('err') == 0
|
||||
assert app.get('/api/search/tracking-code/?q=BBCCDDFF').json.get('err') == 1
|
||||
|
||||
Page.objects.all().delete()
|
||||
page = Page(title='One', slug='index', template_name='standard')
|
||||
page.save()
|
||||
cell = TrackingCodeInputCell(page=page, placeholder='content', order=0)
|
||||
cell.save()
|
||||
|
||||
resp = app.get('/')
|
||||
for i in range(3): # make sure we hit ratelimit
|
||||
app.get('/api/search/tracking-code/?q=BBCCDDFF')
|
||||
resp.form['code'] = 'FOOBAR'
|
||||
resp = resp.form.submit()
|
||||
assert resp.status_code == 302
|
||||
resp = resp.follow()
|
||||
assert '<li class="error">Looking up tracking code is currently rate limited.</li>' in resp.text
|
||||
|
||||
resp = app.get('/')
|
||||
for i in range(3): # make sure we hit ratelimit
|
||||
app.get('/api/search/tracking-code/?q=BBCCDDFF')
|
||||
resp.form['code'] = 'FOOBAR'
|
||||
resp.form['url'] = 'http://example.net/'
|
||||
resp = resp.form.submit(status=403)
|
||||
|
||||
|
||||
@wcs_present
|
||||
def test_wcs_search_engines(app):
|
||||
search_engines = engines.get_engines()
|
||||
|
|
Loading…
Reference in New Issue