query: check slug and name unicity in forms (#43002)

This commit is contained in:
Lauréline Guérin 2020-06-02 11:17:21 +02:00
parent 9f7d7705de
commit 9c50291e3a
No known key found for this signature in database
GPG Key ID: 1FAB9B9B4F93D473
10 changed files with 292 additions and 31 deletions

View File

@ -0,0 +1,27 @@
# passerelle - uniform access to multiple data sources and services
# Copyright (C) 2020 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from django import forms
from passerelle.base.forms import BaseQueryFormMixin
from . import models
class QueryForm(BaseQueryFormMixin, forms.ModelForm):
class Meta:
model = models.Query
fields = '__all__'
exclude = ['resource']

View File

@ -14,28 +14,22 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from django import forms
from django.views.generic import UpdateView, CreateView, DeleteView
from passerelle.base.mixins import ResourceChildViewMixin
from . import models
class QueryForm(forms.ModelForm):
class Meta:
model = models.Query
fields = '__all__'
exclude = ['resource']
from .forms import QueryForm
class QueryNew(ResourceChildViewMixin, CreateView):
model = models.Query
form_class = QueryForm
def form_valid(self, form):
form.instance.resource = self.resource
return super(QueryNew, self).form_valid(form)
def get_form_kwargs(self):
kwargs = super(QueryNew, self).get_form_kwargs()
kwargs['instance'] = self.model(resource=self.resource)
return kwargs
def get_changed_url(self):
return self.object.get_absolute_url()

View File

@ -0,0 +1,27 @@
# passerelle - uniform access to multiple data sources and services
# Copyright (C) 2020 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from django import forms
from passerelle.base.forms import BaseQueryFormMixin
from . import models
class QueryForm(BaseQueryFormMixin, forms.ModelForm):
class Meta:
model = models.Query
fields = '__all__'
exclude = ['resource']

View File

@ -14,19 +14,12 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from django import forms
from django.views.generic import UpdateView, CreateView, DeleteView
from passerelle.base.mixins import ResourceChildViewMixin
from . import models
class QueryForm(forms.ModelForm):
class Meta:
model = models.Query
fields = '__all__'
exclude = ['resource']
from .forms import QueryForm
class QueryNew(ResourceChildViewMixin, CreateView):
@ -34,9 +27,10 @@ class QueryNew(ResourceChildViewMixin, CreateView):
form_class = QueryForm
template_name = "passerelle/manage/resource_child_form.html"
def form_valid(self, form):
form.instance.resource = self.resource
return super(QueryNew, self).form_valid(form)
def get_form_kwargs(self):
kwargs = super(QueryNew, self).get_form_kwargs()
kwargs['instance'] = self.model(resource=self.resource)
return kwargs
class QueryEdit(ResourceChildViewMixin, UpdateView):

View File

@ -19,10 +19,11 @@ from xml.etree import ElementTree as ET
from django import forms
from django.utils.translation import ugettext_lazy as _
from passerelle.base.forms import BaseQueryFormMixin
from . import models
class QueryForm(forms.ModelForm):
class QueryForm(BaseQueryFormMixin, forms.ModelForm):
class Meta:
model = models.Query
fields = '__all__'

View File

@ -26,9 +26,10 @@ class QueryNew(ResourceChildViewMixin, CreateView):
model = models.Query
form_class = QueryForm
def form_valid(self, form):
form.instance.resource = self.resource
return super(QueryNew, self).form_valid(form)
def get_form_kwargs(self):
kwargs = super(QueryNew, self).get_form_kwargs()
kwargs['instance'] = self.model(resource=self.resource)
return kwargs
def get_changed_url(self):
return self.object.get_absolute_url()

View File

@ -33,3 +33,27 @@ class AvailabilityParametersForm(forms.ModelForm):
class ImportSiteForm(forms.Form):
site_json = forms.FileField(label=_('Site Export File'))
import_users = forms.BooleanField(label=_('Import users and access rights'), required=False)
class BaseQueryFormMixin(object):
def clean_slug(self):
slug = self.cleaned_data['slug']
queryset = self.instance.resource.queries.filter(slug=slug)
if self.instance.pk:
queryset = queryset.exclude(pk=self.instance.pk)
if queryset.exists():
raise forms.ValidationError(_('A query with this slug already exists'))
return slug
def clean_name(self):
name = self.cleaned_data['name']
queryset = self.instance.resource.queries.filter(name=name)
if self.instance.pk:
queryset = queryset.exclude(pk=self.instance.pk)
if queryset.exists():
raise forms.ValidationError(_('A query with this name already exists'))
return name

View File

@ -6,13 +6,18 @@ import pytest
import mock
import utils
from django.core.exceptions import ValidationError
from django.contrib.auth.models import User
from django.contrib.contenttypes.models import ContentType
from django.core.exceptions import ValidationError
from passerelle.apps.arcgis.models import ArcGIS, validate_where, SqlFormatter, Query
from passerelle.base.models import ApiUser, AccessRight
from passerelle.utils import import_site
from test_manager import login
pytestmark = pytest.mark.django_db
# from http://sampleserver1.arcgisonline.com/ArcGIS/rest/services/fold/serv/MapServer/1
STATES = '''{
"fieldAliases" : {
@ -80,7 +85,12 @@ STATES = '''{
@pytest.fixture
def arcgis(db):
def admin_user():
return User.objects.create_superuser('admin', email=None, password='admin')
@pytest.fixture
def arcgis():
return ArcGIS.objects.create(slug='test',
base_url='https://arcgis.example.net/')
@ -417,6 +427,64 @@ def test_query_documentation(arcgis, query, app):
assert '<span class="param-name">adress</span>' in resp.text
def test_arcgis_query_unicity(admin_user, app, arcgis):
query = Query.objects.create(
resource=arcgis,
name='Test Query',
slug='test-query',
)
arcgis2 = ArcGIS.objects.create(
slug='test2',
base_url='https://arcgis.example.net/')
Query.objects.create(
resource=arcgis2,
name='Foo Bar',
slug='foo-bar',
)
app = login(app)
resp = app.get('/manage/arcgis/%s/query/new/' % arcgis.slug)
resp.form['slug'] = query.slug
resp.form['name'] = 'Foo Bar'
resp.form['service'] = 'Foo'
resp = resp.form.submit()
assert resp.status_code == 200
assert Query.objects.filter(resource=arcgis).count() == 1
assert 'A query with this slug already exists' in resp.text
resp.form['slug'] = 'foo-bar'
resp.form['name'] = query.name
resp.form['service'] = 'Foo'
resp = resp.form.submit()
assert Query.objects.filter(resource=arcgis).count() == 1
assert resp.status_code == 200
assert 'A query with this name already exists' in resp.text
resp.form['slug'] = 'foo-bar'
resp.form['name'] = 'Foo Bar'
resp.form['service'] = 'Foo'
resp = resp.form.submit()
assert resp.status_code == 302
assert Query.objects.filter(resource=arcgis).count() == 2
new_query = Query.objects.latest('pk')
assert new_query.resource == arcgis
resp = app.get('/manage/arcgis/%s/query/%s/' % (arcgis.slug, new_query.pk))
resp.form['slug'] = query.slug
resp.form['name'] = 'Foo Bar'
resp = resp.form.submit()
assert resp.status_code == 200
assert 'A query with this slug already exists' in resp.text
resp.form['slug'] = 'foo-bar'
resp.form['name'] = query.name
resp = resp.form.submit()
assert resp.status_code == 200
assert 'A query with this name already exists' in resp.text
resp.form['slug'] = 'foo-bar'
resp.form['name'] = 'Foo Bar'
resp = resp.form.submit()
assert resp.status_code == 302
def test_export_import(query):
assert ArcGIS.objects.count() == 1
assert Query.objects.count() == 1

View File

@ -21,10 +21,14 @@ import pytest
import utils
from django.contrib.auth.models import User
from passerelle.apps.opendatasoft.models import OpenDataSoft, Query
from passerelle.utils import import_site
from test_manager import login, admin_user
from test_manager import login
pytestmark = pytest.mark.django_db
FAKED_CONTENT_Q_SEARCH = json.dumps({
@ -155,7 +159,12 @@ FAKED_CONTENT_ID_SEARCH = json.dumps({
@pytest.fixture
def connector(db):
def admin_user():
return User.objects.create_superuser('admin', email=None, password='admin')
@pytest.fixture
def connector():
return utils.setup_access_rights(OpenDataSoft.objects.create(
slug='my_connector',
api_key='my_secret',
@ -289,3 +298,56 @@ def test_query_q_using_id(mocked_get, app, query):
resp = app.get(endpoint, params=params, status=200)
assert len(resp.json['data']) == 1
assert resp.json['data'][0]['text'] == "19 RUE DE L'AUBEPINE Lipsheim"
def test_opendatasoft_query_unicity(admin_user, app, connector, query):
connector2 = OpenDataSoft.objects.create(
slug='my_connector2',
api_key='my_secret',
)
Query.objects.create(
resource=connector2,
name='Foo Bar',
slug='foo-bar',
)
app = login(app)
resp = app.get('/manage/opendatasoft/%s/query/new/' % connector.slug)
resp.form['slug'] = query.slug
resp.form['name'] = 'Foo Bar'
resp.form['dataset'] = 'my-dataset'
resp = resp.form.submit()
assert resp.status_code == 200
assert Query.objects.filter(resource=connector).count() == 1
assert 'A query with this slug already exists' in resp.text
resp.form['slug'] = 'foo-bar'
resp.form['name'] = query.name
resp.form['dataset'] = 'my-dataset'
resp = resp.form.submit()
assert Query.objects.filter(resource=connector).count() == 1
assert resp.status_code == 200
assert 'A query with this name already exists' in resp.text
resp.form['slug'] = 'foo-bar'
resp.form['name'] = 'Foo Bar'
resp.form['dataset'] = 'my-dataset'
resp = resp.form.submit()
assert resp.status_code == 302
assert Query.objects.filter(resource=connector).count() == 2
new_query = Query.objects.latest('pk')
assert new_query.resource == connector
resp = app.get('/manage/opendatasoft/%s/query/%s/' % (connector.slug, new_query.pk))
resp.form['slug'] = query.slug
resp.form['name'] = 'Foo Bar'
resp = resp.form.submit()
assert resp.status_code == 200
assert 'A query with this slug already exists' in resp.text
resp.form['slug'] = 'foo-bar'
resp.form['name'] = query.name
resp = resp.form.submit()
assert resp.status_code == 200
assert 'A query with this name already exists' in resp.text
resp.form['slug'] = 'foo-bar'
resp.form['name'] = 'Foo Bar'
resp = resp.form.submit()
assert resp.status_code == 302

View File

@ -2,14 +2,19 @@ import json
import mock
import pytest
from django.contrib.auth.models import User
from django.core.management import call_command
from passerelle.apps.opengis.models import OpenGIS, Query, FeatureCache
from passerelle.base.models import Job
from passerelle.utils import import_site
from test_manager import login
import utils
pytestmark = pytest.mark.django_db
FAKE_FEATURE_INFO = '''<?xml version="1.0" encoding="UTF-8"?>
<msGMLOutput
xmlns:gml="http://www.opengis.net/gml"
@ -314,7 +319,12 @@ FAKE_GEOLOCATED_FEATURE_CIRCLE = {
@pytest.fixture
def connector(db):
def admin_user():
return User.objects.create_superuser('admin', email=None, password='admin')
@pytest.fixture
def connector():
return utils.setup_access_rights(OpenGIS.objects.create(
slug='test',
wms_service_url='http://example.net/wms',
@ -805,6 +815,59 @@ def test_opengis_query_data_filter(mocked_get, app, connector, query):
assert 'extra parameter' in resp.json['err_desc']
def test_opengis_query_unicity(admin_user, app, connector, query):
connector2 = OpenGIS.objects.create(
slug='test2',
wms_service_url='http://example.net/wms',
wfs_service_url='http://example.net/wfs')
Query.objects.create(
resource=connector2,
name='Foo Bar',
slug='foo-bar',
)
app = login(app)
resp = app.get('/manage/opengis/%s/query/new/' % connector.slug)
resp.form['slug'] = query.slug
resp.form['name'] = 'Foo Bar'
resp.form['typename'] = 'foo'
resp = resp.form.submit()
assert resp.status_code == 200
assert Query.objects.filter(resource=connector).count() == 1
assert 'A query with this slug already exists' in resp.text
resp.form['slug'] = 'foo-bar'
resp.form['name'] = query.name
resp.form['typename'] = 'foo'
resp = resp.form.submit()
assert Query.objects.filter(resource=connector).count() == 1
assert resp.status_code == 200
assert 'A query with this name already exists' in resp.text
resp.form['slug'] = 'foo-bar'
resp.form['name'] = 'Foo Bar'
resp.form['typename'] = 'foo'
resp = resp.form.submit()
assert resp.status_code == 302
assert Query.objects.filter(resource=connector).count() == 2
new_query = Query.objects.latest('pk')
assert new_query.resource == connector
resp = app.get('/manage/opengis/%s/query/%s/' % (connector.slug, new_query.pk))
resp.form['slug'] = query.slug
resp.form['name'] = 'Foo Bar'
resp = resp.form.submit()
assert resp.status_code == 200
assert 'A query with this slug already exists' in resp.text
resp.form['slug'] = 'foo-bar'
resp.form['name'] = query.name
resp = resp.form.submit()
assert resp.status_code == 200
assert 'A query with this name already exists' in resp.text
resp.form['slug'] = 'foo-bar'
resp.form['name'] = 'Foo Bar'
resp = resp.form.submit()
assert resp.status_code == 302
def test_opengis_export_import(query):
assert OpenGIS.objects.count() == 1
assert Query.objects.count() == 1