Added 'guichet' (planned availability), added CRUD views for each entity, many fixes and refactoring
Features: * A connector now can be associated with 'collectivites', and each of them can have one 'guichet'. A 'collectivite' have a name and an openADS technical identifier A 'guichet' allow to define hours and days for when its 'collectivite' is "open" * Each of those entities have a view to CRUD it * For entity ForwardFile: - Added association with a connector, and eventually a 'collectivite' - Added ForwardFiles 'size' field - Added updating fields 'content_type', 'file_hash' and 'size' on save() - Added custom validation on save() * For entity AtrealOpenads: - Added permissions on each endpoint - Added 'email' field support for each type of 'demandeur' - 'numero_dossier' is now required in the url for some endpoints (previously was a GET param) - Added 'upload_user_files' as an endpoint (but can still be used as a method with request=None) - The 'upload_user_files()' method now only handle forward file that have status='pending' Fixes: * Added "*args" and "**kwargs" arguments to some connector endpoint methods * Added 'verbose_name' and 'ordering' on each entity META * Added decorator '@force_encoded_string_output' to prevent utf-8 issues with python2 * Added __repr__, __str__ and __unicode__ functions to each entities * Added database indexes for each entity * Commented out useless JSON schema imports * Removed unused variable assignations Refactoring: * Moved utilities functions to utils.py file * Added a BaseModel to provide some default functions for Models * Added enum/translations of hard-coded values for ForwardFile 'upload_status' field Tests: * Added test for each entity * Added tests files specific to utilities, forms and views * Total code coverage is 99% with only 10 statement missed
This commit is contained in:
parent
e8de2e66e8
commit
990f1fb7bc
|
@ -0,0 +1,67 @@
|
|||
#!/usr/bin/env python
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
from django.forms import ModelForm
|
||||
|
||||
from .models import ForwardFile, Collectivite, Guichet
|
||||
|
||||
class ForwardFileForm(ModelForm):
|
||||
class Meta:
|
||||
model = ForwardFile
|
||||
exclude = ['connecteur', 'size', 'file_hash']
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
|
||||
connecteur = kwargs.pop('connecteur' , None)
|
||||
collectivite = kwargs.pop('collectivite', None)
|
||||
|
||||
super(ForwardFileForm, self).__init__(*args, **kwargs)
|
||||
|
||||
if (
|
||||
(not hasattr(self.instance, 'connecteur') or not self.instance.connecteur)
|
||||
and connecteur
|
||||
):
|
||||
self.instance.connecteur = connecteur
|
||||
if (
|
||||
(not hasattr(self.instance, 'collectivite') or not self.instance.collectivite)
|
||||
and collectivite
|
||||
):
|
||||
self.instance.collectivite = collectivite
|
||||
|
||||
# only allow to select a 'collectivite' that belongs to the connecteur
|
||||
if hasattr(self.instance, 'connecteur') and self.instance.connecteur:
|
||||
self.fields['collectivite'].queryset = Collectivite.objects.filter(connecteur=self.instance.connecteur)
|
||||
|
||||
# TODO if the status is 'uploading' make everything read-only
|
||||
|
||||
|
||||
class CollectiviteForm(ModelForm):
|
||||
|
||||
class Meta:
|
||||
model = Collectivite
|
||||
exclude = ['connecteur']
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
connecteur = kwargs.pop('connecteur', None)
|
||||
super(CollectiviteForm, self).__init__(*args, **kwargs)
|
||||
if (
|
||||
(not hasattr(self.instance, 'connecteur') or not self.instance.connecteur)
|
||||
and connecteur
|
||||
):
|
||||
self.instance.connecteur = connecteur
|
||||
|
||||
|
||||
class GuichetForm(ModelForm):
|
||||
|
||||
class Meta:
|
||||
model = Guichet
|
||||
exclude = ['collectivite']
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
collectivite = kwargs.pop('collectivite', None)
|
||||
super(GuichetForm, self).__init__(*args, **kwargs)
|
||||
if (
|
||||
(not hasattr(self.instance, 'collectivite') or not self.instance.collectivite)
|
||||
and collectivite
|
||||
):
|
||||
self.instance.collectivite = collectivite
|
|
@ -1,9 +1,10 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# Generated by Django 1.11.15 on 2019-07-18 20:51
|
||||
# Generated by Django 1.11.18 on 2019-08-20 15:02
|
||||
from __future__ import unicode_literals
|
||||
|
||||
import atreal_openads.models
|
||||
import atreal_openads.utils
|
||||
from django.db import migrations, models
|
||||
import django.db.models.deletion
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
@ -28,13 +29,30 @@ class Migration(migrations.Migration):
|
|||
('trusted_certificate_authorities', models.FileField(blank=True, null=True, upload_to=b'', verbose_name='TLS trusted CAs')),
|
||||
('verify_cert', models.BooleanField(default=True, verbose_name='TLS verify certificates')),
|
||||
('http_proxy', models.CharField(blank=True, max_length=128, verbose_name='HTTP and HTTPS proxy')),
|
||||
('collectivite', models.CharField(blank=True, default=b'', help_text='ex: Marseille, or ex: 3', max_length=255, verbose_name='Collectivity (identifier)')),
|
||||
('default_collectivite_openADS_id', models.PositiveIntegerField(blank=True, default=0, help_text='ex: 3', verbose_name="Default 'collectivite' (identifier in openADS)")),
|
||||
('openADS_API_url', models.URLField(default=b'', help_text='ex: https://openads.your_domain.net/api/', max_length=255, verbose_name='openADS API URL')),
|
||||
('users', models.ManyToManyField(blank=True, related_name='_atrealopenads_users_+', related_query_name='+', to='base.ApiUser')),
|
||||
],
|
||||
options={
|
||||
'ordering': ['openADS_API_url'],
|
||||
'verbose_name': 'openADS',
|
||||
'verbose_name_plural': 'openADS',
|
||||
},
|
||||
bases=(models.Model, atreal_openads.utils.BaseModel),
|
||||
),
|
||||
migrations.CreateModel(
|
||||
name='Collectivite',
|
||||
fields=[
|
||||
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
|
||||
('name', models.CharField(blank=True, default=b'', max_length=150)),
|
||||
('openADS_id', models.PositiveIntegerField(help_text='ex: 3', verbose_name='openADS identifier')),
|
||||
('connecteur', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='collectivites', related_query_name='collectivite', to='atreal_openads.AtrealOpenads')),
|
||||
],
|
||||
options={
|
||||
'ordering': ['name'],
|
||||
'verbose_name': 'Collectivite',
|
||||
},
|
||||
bases=(models.Model, atreal_openads.utils.BaseModel),
|
||||
),
|
||||
migrations.CreateModel(
|
||||
name='ForwardFile',
|
||||
|
@ -46,11 +64,90 @@ class Migration(migrations.Migration):
|
|||
('file_hash', models.CharField(blank=True, default=b'', max_length=100)),
|
||||
('orig_filename', models.CharField(blank=True, default=b'', max_length=100)),
|
||||
('content_type', models.CharField(blank=True, default=b'', max_length=100)),
|
||||
('upload_file', models.FileField(null=True, upload_to=atreal_openads.models.get_upload_path)),
|
||||
('size', models.PositiveIntegerField(default=0)),
|
||||
('upload_file', models.FileField(blank=True, null=True, upload_to=atreal_openads.utils.get_upload_path)),
|
||||
('upload_attempt', models.PositiveIntegerField(blank=True, default=0)),
|
||||
('upload_status', models.CharField(blank=True, default=b'', max_length=10)),
|
||||
('upload_status', models.CharField(choices=[(b'pending', 'Pending'), (b'uploading', 'Uploading'), (b'failed', 'Failed'), (b'success', 'Success')], default=b'pending', max_length=10)),
|
||||
('upload_msg', models.CharField(blank=True, default=b'', max_length=255)),
|
||||
('last_update_datetime', models.DateTimeField(auto_now=True)),
|
||||
('collectivite', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, related_name='forward_files', related_query_name='forward_file', to='atreal_openads.Collectivite')),
|
||||
('connecteur', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='forward_files', related_query_name='forward_file', to='atreal_openads.AtrealOpenads')),
|
||||
],
|
||||
options={
|
||||
'ordering': ['-last_update_datetime'],
|
||||
'verbose_name': 'Forward File',
|
||||
},
|
||||
bases=(models.Model, atreal_openads.utils.BaseModel),
|
||||
),
|
||||
migrations.CreateModel(
|
||||
name='Guichet',
|
||||
fields=[
|
||||
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
|
||||
('ouverture_jour_h', models.TimeField(help_text='ex: 08:30', verbose_name='Hour of opening (each day)')),
|
||||
('fermeture_jour_h', models.TimeField(help_text='ex: 17:00', verbose_name='Hour of closing (each day)')),
|
||||
('ouverture_sem_d', models.PositiveIntegerField(choices=[(1, 'Monday'), (2, 'Tuesday'), (3, 'Wednesday'), (4, 'Thursday'), (5, 'Friday'), (6, 'Saturday'), (7, 'Sunday')], default=1, help_text='ex: Lundi', verbose_name='Day of opening (each week)')),
|
||||
('fermeture_sem_d', models.PositiveIntegerField(choices=[(1, 'Monday'), (2, 'Tuesday'), (3, 'Wednesday'), (4, 'Thursday'), (5, 'Friday'), (6, 'Saturday'), (7, 'Sunday')], default=6, help_text='ex: Samedi', verbose_name='Day of closing (each week)')),
|
||||
('ouverture_sem_h', models.TimeField(help_text='ex: 08:30', verbose_name='Hour of opening (each week)')),
|
||||
('fermeture_sem_h', models.TimeField(help_text='ex: 12:15', verbose_name='Hour of closing (each week)')),
|
||||
('collectivite', models.OneToOneField(on_delete=django.db.models.deletion.CASCADE, related_name='guichet', to='atreal_openads.Collectivite')),
|
||||
],
|
||||
options={
|
||||
'ordering': ['collectivite'],
|
||||
'verbose_name': 'Guichet',
|
||||
'verbose_name_plural': 'Guichets Urbanisme',
|
||||
},
|
||||
bases=(models.Model, atreal_openads.utils.BaseModel),
|
||||
),
|
||||
migrations.AddIndex(
|
||||
model_name='guichet',
|
||||
index=models.Index(fields=[b'collectivite'], name=b'su_collectivite_idx'),
|
||||
),
|
||||
migrations.AddIndex(
|
||||
model_name='forwardfile',
|
||||
index=models.Index(fields=[b'connecteur'], name=b'ff_connecteur_idx'),
|
||||
),
|
||||
migrations.AddIndex(
|
||||
model_name='forwardfile',
|
||||
index=models.Index(fields=[b'collectivite'], name=b'ff_collectivite_idx'),
|
||||
),
|
||||
migrations.AddIndex(
|
||||
model_name='forwardfile',
|
||||
index=models.Index(fields=[b'numero_demande', b'numero_dossier'], name=b'ff_deman_doss_idx'),
|
||||
),
|
||||
migrations.AddIndex(
|
||||
model_name='forwardfile',
|
||||
index=models.Index(fields=[b'numero_demande'], name=b'ff_demande_idx'),
|
||||
),
|
||||
migrations.AddIndex(
|
||||
model_name='forwardfile',
|
||||
index=models.Index(fields=[b'numero_dossier'], name=b'ff_dossier_idx'),
|
||||
),
|
||||
migrations.AddIndex(
|
||||
model_name='forwardfile',
|
||||
index=models.Index(fields=[b'orig_filename'], name=b'ff_filename_idx'),
|
||||
),
|
||||
migrations.AddIndex(
|
||||
model_name='forwardfile',
|
||||
index=models.Index(fields=[b'upload_status'], name=b'ff_status_idx'),
|
||||
),
|
||||
migrations.AddIndex(
|
||||
model_name='forwardfile',
|
||||
index=models.Index(fields=[b'last_update_datetime'], name=b'ff_last_up_dt_idx'),
|
||||
),
|
||||
migrations.AddIndex(
|
||||
model_name='collectivite',
|
||||
index=models.Index(fields=[b'connecteur', b'openADS_id'], name=b'col_conn_openADSid_idx'),
|
||||
),
|
||||
migrations.AddIndex(
|
||||
model_name='collectivite',
|
||||
index=models.Index(fields=[b'connecteur'], name=b'col_connecteur_idx'),
|
||||
),
|
||||
migrations.AddIndex(
|
||||
model_name='collectivite',
|
||||
index=models.Index(fields=[b'openADS_id'], name=b'col_openADS_id_idx'),
|
||||
),
|
||||
migrations.AlterUniqueTogether(
|
||||
name='collectivite',
|
||||
unique_together=set([('connecteur', 'openADS_id')]),
|
||||
),
|
||||
]
|
||||
|
|
|
@ -24,161 +24,374 @@ import datetime
|
|||
import os
|
||||
import re
|
||||
import magic
|
||||
import hashlib
|
||||
import copy
|
||||
|
||||
from HTMLParser import HTMLParser
|
||||
|
||||
from django.db import models
|
||||
from django.http import Http404
|
||||
from django.utils.translation import ugettext_lazy as _
|
||||
from django.core.files import File
|
||||
from django.core.files.base import ContentFile
|
||||
from django.core.exceptions import ValidationError
|
||||
|
||||
from passerelle.base.models import BaseResource, HTTPResource
|
||||
from passerelle.utils.api import endpoint
|
||||
from passerelle.utils.jsonresponse import APIError
|
||||
|
||||
from .json_schemas import (
|
||||
JSON_SCHEMA_CHECK_STATUS_OUT,
|
||||
JSON_SCHEMA_CREATE_DOSSIER_IN,
|
||||
JSON_SCHEMA_CREATE_DOSSIER_OUT,
|
||||
JSON_SCHEMA_GET_DOSSIER_OUT,
|
||||
JSON_SCHEMA_GET_FWD_FILES_OUT,
|
||||
JSON_SCHEMA_GET_FWD_FILES_STATUS_OUT,
|
||||
JSON_SCHEMA_GET_COURRIER_OUT
|
||||
#JSON_SCHEMA_CHECK_STATUS_OUT,
|
||||
#JSON_SCHEMA_CREATE_DOSSIER_OUT,
|
||||
#JSON_SCHEMA_GET_DOSSIER_OUT,
|
||||
#JSON_SCHEMA_GET_FWD_FILES_OUT,
|
||||
#JSON_SCHEMA_GET_FWD_FILES_STATUS_OUT,
|
||||
#JSON_SCHEMA_GET_COURRIER_OUT
|
||||
)
|
||||
|
||||
from .utils import (
|
||||
force_encoded_string_output,
|
||||
strip_tags,
|
||||
clean_spaces,
|
||||
normalize,
|
||||
get_file_digest,
|
||||
get_upload_path,
|
||||
get_file_extension,
|
||||
trunc_str_values,
|
||||
DictDumper,
|
||||
BaseModel
|
||||
)
|
||||
|
||||
|
||||
class MLStripper(HTMLParser):
|
||||
"""HTML parser that removes html tags."""
|
||||
def __init__(self):
|
||||
self.reset()
|
||||
self.fed = []
|
||||
def handle_data(self, d):
|
||||
self.fed.append(d)
|
||||
def get_data(self):
|
||||
return ''.join(self.fed)
|
||||
|
||||
|
||||
def strip_tags(html):
|
||||
"""Remove html tags from a string."""
|
||||
s = MLStripper()
|
||||
s.feed(html)
|
||||
return s.get_data()
|
||||
|
||||
|
||||
def clean_spaces(text):
|
||||
"""Remove extra spaces an line breaks from a string."""
|
||||
text = text.replace('\n', ' ')
|
||||
text = text.replace('\r', ' ')
|
||||
text = text.replace('\t', ' ')
|
||||
text = text.replace('\\n', ' ')
|
||||
text = text.replace('\\r', ' ')
|
||||
text = text.replace('\\t', ' ')
|
||||
return re.sub(r' +', ' ', text).strip()
|
||||
|
||||
|
||||
def normalize(value):
|
||||
"""Normalize a value to be send to openADS.API."""
|
||||
if value is None:
|
||||
return ''
|
||||
if not isinstance(value, unicode):
|
||||
value = unicode(value)
|
||||
return clean_spaces(value)
|
||||
|
||||
|
||||
def get_file_data(path, b64=True):
|
||||
"""Return the content of a file as a string, in base64 if specified."""
|
||||
with open(path, 'r') as f:
|
||||
if b64:
|
||||
return base64.b64encode(f.read())
|
||||
return f.read()
|
||||
|
||||
|
||||
def get_upload_path(instance, filename=None):
|
||||
"""Return a relative upload path for a file."""
|
||||
# be careful:
|
||||
# * openADS accept only filename less than 50 chars
|
||||
# * name should be unique, even if the content is the same
|
||||
return 'pass_openADS_up_%s_%s' % (
|
||||
datetime.datetime.now().strftime('%Y-%b-%d_%Hh%Mm%Ss%f'),
|
||||
instance.file_hash[:4]
|
||||
)
|
||||
|
||||
|
||||
def trunc_str_values(value, limit, visited=None, truncate_text=u'…'):
|
||||
"""Truncate a string value (not dict keys) and append a truncate text."""
|
||||
|
||||
if visited is None:
|
||||
visited = []
|
||||
if not value in visited:
|
||||
if isinstance(value, basestring) and len(value) > limit:
|
||||
value = value[:limit] + truncate_text
|
||||
elif isinstance(value, dict) or isinstance(value, list) or isinstance(value, tuple):
|
||||
visited.append(value)
|
||||
iterator = value.iteritems() if isinstance(value, dict) else enumerate(value)
|
||||
for k,v in iterator:
|
||||
value[k] = trunc_str_values(v, limit, visited, truncate_text)
|
||||
return value
|
||||
|
||||
|
||||
class DictDumper(object):
|
||||
"""Helper to dump a dictionary to a string representation with lazy processing.
|
||||
|
||||
Only applied when dict is converted to string (lazy processing):
|
||||
- long strings truncated (after the dict has been 'deep' copied)
|
||||
- (optionaly) dict converted with json.dumps instead of unicode().
|
||||
"""
|
||||
|
||||
def __init__(self, dic, max_str_len=255, use_json_dumps=True):
|
||||
""" arguments:
|
||||
- dic string the dict to dump
|
||||
- max_str_len integer the maximul length of string values
|
||||
- use_json_dumps boolean True to use json.dumps() else it uses unicode()
|
||||
"""
|
||||
self.dic = dic
|
||||
self.max_str_len = max_str_len
|
||||
self.use_json_dumps = use_json_dumps
|
||||
|
||||
def __str__(self):
|
||||
dict_trunc = trunc_str_values(copy.deepcopy(self.dic), self.max_str_len)
|
||||
dict_ref = json.dumps(dict_trunc) if self.use_json_dumps else dict_trunc
|
||||
return unicode(dict_ref)
|
||||
|
||||
|
||||
class ForwardFile(models.Model):
|
||||
class ForwardFile(models.Model, BaseModel):
|
||||
"""Represent a file uploaded by a user, to be forwarded to openADS.API."""
|
||||
|
||||
STATUSES = [
|
||||
('pending' , _('Pending')),
|
||||
('uploading', _('Uploading')),
|
||||
('failed' , _('Failed')),
|
||||
('success' , _('Success'))
|
||||
]
|
||||
|
||||
connecteur = models.ForeignKey('AtrealOpenads',
|
||||
on_delete=models.CASCADE,
|
||||
related_name="forward_files",
|
||||
related_query_name="forward_file")
|
||||
collectivite = models.ForeignKey('Collectivite', blank=True, null=True,
|
||||
on_delete=models.CASCADE,
|
||||
related_name="forward_files",
|
||||
related_query_name="forward_file")
|
||||
numero_demande = models.CharField(max_length=20)
|
||||
numero_dossier = models.CharField(max_length=20)
|
||||
type_fichier = models.CharField(max_length=10)
|
||||
file_hash = models.CharField(max_length=100, default='', blank=True)
|
||||
orig_filename = models.CharField(max_length=100, default='', blank=True)
|
||||
content_type = models.CharField(max_length=100, default='', blank=True)
|
||||
upload_file = models.FileField(upload_to=get_upload_path, null=True)
|
||||
size = models.PositiveIntegerField(default=0)
|
||||
upload_file = models.FileField(upload_to=get_upload_path, blank=True, null=True)
|
||||
upload_attempt = models.PositiveIntegerField(default=0, blank=True)
|
||||
upload_status = models.CharField(max_length=10, default='', blank=True)
|
||||
upload_status = models.CharField(max_length=10, choices=STATUSES, default='pending')
|
||||
upload_msg = models.CharField(max_length=255, default='', blank=True)
|
||||
last_update_datetime = models.DateTimeField(auto_now=True)
|
||||
|
||||
class Meta:
|
||||
verbose_name = _('Forward File')
|
||||
indexes = [
|
||||
models.Index(fields=['connecteur'] , name='ff_connecteur_idx'),
|
||||
models.Index(fields=['collectivite'] , name='ff_collectivite_idx'),
|
||||
models.Index(fields=['numero_demande', 'numero_dossier'], name='ff_deman_doss_idx'),
|
||||
models.Index(fields=['numero_demande'], name='ff_demande_idx'),
|
||||
models.Index(fields=['numero_dossier'], name='ff_dossier_idx'),
|
||||
models.Index(fields=['orig_filename'] , name='ff_filename_idx'),
|
||||
models.Index(fields=['upload_status'] , name='ff_status_idx'),
|
||||
models.Index(fields=['last_update_datetime'], name='ff_last_up_dt_idx')
|
||||
]
|
||||
ordering = ['-last_update_datetime']
|
||||
|
||||
class AtrealOpenads(BaseResource, HTTPResource):
|
||||
def get_status(self, status_codename=None):
|
||||
"""Return the upload status human name translated.
|
||||
If specified codename is not found, return it.
|
||||
"""
|
||||
if not status_codename:
|
||||
status_codename = self.upload_status
|
||||
for st in self.STATUSES:
|
||||
if st[0] == status_codename:
|
||||
return st[1]
|
||||
return status_codename
|
||||
|
||||
@force_encoded_string_output
|
||||
def __repr__(self):
|
||||
return u'ForwardFile(id=%s,connecteur=%s,collectivite=%s,demande=%s,dossier=%s,type=%s,filename=%s,status=%s)' % (
|
||||
self.id,
|
||||
unicode(self.connecteur) if hasattr(self, 'connecteur') else None,
|
||||
unicode(self.collectivite) if hasattr(self, 'collectivite') else None,
|
||||
self.numero_demande, self.numero_dossier,
|
||||
self.type_fichier, self.orig_filename, self.upload_status)
|
||||
|
||||
def __unicode__(self):
|
||||
return u"%s[%s]" % (trunc_str_values(self.orig_filename, 20), self.get_status())
|
||||
|
||||
def get_url_params(self, *args, **kwargs):
|
||||
params = super(ForwardFile, self).get_url_params(*args, **kwargs)
|
||||
params['connecteur'] = self.connecteur.slug if self.connecteur else None
|
||||
return params
|
||||
|
||||
def update_content_type(self, only_if_empty=False):
|
||||
"""Update the content type from the content of the file."""
|
||||
if not self.content_type or not only_if_empty:
|
||||
if self.upload_file and self.upload_file.size:
|
||||
self.content_type = magic.from_buffer(self.upload_file.read(1024), mime=True)
|
||||
else:
|
||||
self.content_type = ''
|
||||
|
||||
def update_file_hash(self, only_if_empty=False):
|
||||
"""Update the file_hash field from the content of the file."""
|
||||
if not self.file_hash or not only_if_empty:
|
||||
if self.upload_file and self.upload_file.size:
|
||||
self.file_hash = get_file_digest(self.upload_file)
|
||||
else:
|
||||
self.file_hash = ''
|
||||
|
||||
# preprocessing data and validate model before saving
|
||||
# /!\ Attention: this will not be triggered when doing bulk actions like with QuerySet.update()
|
||||
# @see: https://docs.djangoproject.com/en/2.2/topics/db/models/#overriding-predefined-model-methods
|
||||
# The note entitled "Overridden model methods are not called on bulk operations"
|
||||
def save(self, *args, **kwargs):
|
||||
# delete file content (on success)
|
||||
if self.upload_status == 'success':
|
||||
if self.upload_file and self.upload_file.size > 0:
|
||||
self.upload_file.delete()
|
||||
# else, update metadata
|
||||
else:
|
||||
self.size = self.upload_file.size if self.upload_file else 0
|
||||
self.update_file_hash()
|
||||
self.update_content_type(only_if_empty=True)
|
||||
# validation (calling self.clean())
|
||||
self.full_clean()
|
||||
super(ForwardFile, self).save(*args, **kwargs)
|
||||
|
||||
# check that one the following fields must not be blank/null:
|
||||
# 'file_hash', 'orig_filename', 'upload_file'
|
||||
# because if they are all empty we dont have any usefull information about the upload
|
||||
def clean(self, *args, **kwargs):
|
||||
ret = super(ForwardFile, self).clean(*args, **kwargs)
|
||||
if (not self.file_hash
|
||||
and not self.orig_filename
|
||||
and (not self.upload_file or not self.upload_file.size)
|
||||
):
|
||||
raise ValidationError(
|
||||
_("A %s cannot have all the following fields empty: %s." % (
|
||||
self.get_verbose_name(),
|
||||
['file_hash', 'orig_filename', 'upload_file'])
|
||||
)
|
||||
)
|
||||
return ret
|
||||
|
||||
|
||||
class Collectivite(models.Model, BaseModel):
|
||||
"""Represent a "collectivite"."""
|
||||
|
||||
name = models.CharField(max_length=150, default='', blank=True)
|
||||
connecteur = models.ForeignKey('AtrealOpenads',
|
||||
on_delete=models.CASCADE,
|
||||
related_name="collectivites",
|
||||
related_query_name="collectivite")
|
||||
openADS_id = models.PositiveIntegerField(_('openADS identifier'), help_text=_('ex: 3'))
|
||||
|
||||
# 'guichet' will be a property provided by the one-to-one relation of Guichet
|
||||
# 'forward_files' will be a property provided by the related_name of the foreignKey
|
||||
|
||||
class Meta:
|
||||
verbose_name = _('Collectivite')
|
||||
unique_together = ['connecteur', 'openADS_id']
|
||||
indexes = [
|
||||
models.Index(fields=['connecteur', 'openADS_id'], name='col_conn_openADSid_idx'),
|
||||
models.Index(fields=['connecteur'], name='col_connecteur_idx'),
|
||||
models.Index(fields=['openADS_id'], name='col_openADS_id_idx')
|
||||
]
|
||||
ordering = ['name']
|
||||
|
||||
@classmethod
|
||||
def get_fields(cls, *args, **kwargs):
|
||||
# get_fields() return is immutable, hence the copy
|
||||
fields = [f for f in super(Collectivite, cls).get_fields(*args, **kwargs)]
|
||||
# moving related fields field at the end of the list
|
||||
if fields:
|
||||
rels = []
|
||||
for rel_name in ['forward_file', 'guichet']:
|
||||
if (fields[0]
|
||||
and hasattr(fields[0], 'name')
|
||||
and fields[0].name == rel_name
|
||||
):
|
||||
rels.append(fields.pop(0))
|
||||
for rel in reversed(rels):
|
||||
fields.append(rel)
|
||||
return fields
|
||||
|
||||
@force_encoded_string_output
|
||||
def __repr__(self):
|
||||
return u'Collectivite(id=%s,name=%s,connecteur=%s,openADS_id=%s,guichet=%s)' % (
|
||||
self.id, unicode(self.name),
|
||||
unicode(self.connecteur) if hasattr(self, 'connecteur') else None,
|
||||
self.openADS_id,
|
||||
unicode(self.guichet) if hasattr(self, 'guichet') else None)
|
||||
|
||||
def __unicode__(self):
|
||||
return self.name if isinstance(self.name, unicode) else unicode(self.name)
|
||||
|
||||
def get_fields_kv(self, *args, **kwargs):
|
||||
fields = super(Collectivite, self).get_fields_kv(*args, **kwargs)
|
||||
# moving related fields field at the end of the list
|
||||
if fields:
|
||||
rels = []
|
||||
for rel_name in ['forward_file', 'guichet']:
|
||||
if (fields[0] and fields[0][0]
|
||||
and hasattr(fields[0][0], 'name')
|
||||
and fields[0][0].name == rel_name
|
||||
):
|
||||
rels.append(fields.pop(0))
|
||||
for rel in reversed(rels):
|
||||
fields.append(rel)
|
||||
return fields
|
||||
|
||||
def get_url_params(self, *args, **kwargs):
|
||||
params = super(Collectivite, self).get_url_params(*args, **kwargs)
|
||||
params['connecteur'] = self.connecteur.slug if self.connecteur else None
|
||||
return params
|
||||
|
||||
|
||||
class Guichet(models.Model, BaseModel):
|
||||
"""Represent a "Guichet"."""
|
||||
|
||||
DAYS = [
|
||||
(1, _('Monday')),
|
||||
(2, _('Tuesday')),
|
||||
(3, _('Wednesday')),
|
||||
(4, _('Thursday')),
|
||||
(5, _('Friday')),
|
||||
(6, _('Saturday')),
|
||||
(7, _('Sunday'))
|
||||
]
|
||||
|
||||
collectivite = models.OneToOneField('Collectivite',
|
||||
on_delete=models.CASCADE,
|
||||
related_name="guichet")
|
||||
ouverture_jour_h = models.TimeField(_('Hour of opening (each day)'), help_text=_('ex: 08:30'))
|
||||
fermeture_jour_h = models.TimeField(_('Hour of closing (each day)'), help_text=_('ex: 17:00'))
|
||||
ouverture_sem_d = models.PositiveIntegerField(_('Day of opening (each week)'), help_text=_('ex: Lundi'), choices=DAYS, default=1)
|
||||
fermeture_sem_d = models.PositiveIntegerField(_('Day of closing (each week)'), help_text=_('ex: Samedi'), choices=DAYS, default=6)
|
||||
ouverture_sem_h = models.TimeField(_('Hour of opening (each week)'), help_text=_('ex: 08:30'))
|
||||
fermeture_sem_h = models.TimeField(_('Hour of closing (each week)'), help_text=_('ex: 12:15'))
|
||||
|
||||
class Meta:
|
||||
verbose_name = _('Guichet')
|
||||
verbose_name_plural = _('Guichets Urbanisme')
|
||||
indexes = [
|
||||
models.Index(fields=['collectivite'], name='su_collectivite_idx')
|
||||
]
|
||||
ordering = ['collectivite']
|
||||
|
||||
@force_encoded_string_output
|
||||
def __repr__(self):
|
||||
return u'Guichet(id=%s,collectivite=%s,%s)' % (
|
||||
self.id,
|
||||
unicode(self.collectivite) if hasattr(self, 'collectivite') else None,
|
||||
unicode(self))
|
||||
|
||||
def __unicode__(self):
|
||||
return u'%s %s -> %s %s [%s/%s]' % (
|
||||
unicode(self.DAYS[self.ouverture_sem_d - 1][1]),
|
||||
self.ouverture_sem_h.strftime('%H:%M') if self.ouverture_sem_h else None,
|
||||
unicode(self.DAYS[self.fermeture_sem_d - 1][1]),
|
||||
self.fermeture_sem_h.strftime('%H:%M') if self.fermeture_sem_h else None,
|
||||
self.ouverture_jour_h.strftime('%H:%M') if self.ouverture_jour_h else None,
|
||||
self.fermeture_jour_h.strftime('%H:%M') if self.fermeture_jour_h else None)
|
||||
|
||||
def get_url_params(self, *args, **kwargs):
|
||||
params = super(Guichet, self).get_url_params(*args, **kwargs)
|
||||
params['collectivite'] = self.collectivite.id if self.collectivite else None
|
||||
params['connecteur'] = self.collectivite.connecteur.slug if self.collectivite else None
|
||||
return params
|
||||
|
||||
def get_list_url(self):
|
||||
raise Exception(u"Guichet:get_list_url() method should not be called")
|
||||
|
||||
# @raise TypeError if argument is not a datetime object
|
||||
def is_open(self, dt):
|
||||
""" Return 'True' if the "Guichet" is open, else False."""
|
||||
if dt:
|
||||
if not isinstance(dt, datetime.datetime):
|
||||
raise TypeError(u"is_open() expect a datetime object (not a %s)" % type(dt))
|
||||
|
||||
ouverture_jour_dt = datetime.datetime.combine(dt, self.ouverture_jour_h)
|
||||
fermeture_jour_dt = datetime.datetime.combine(dt, self.fermeture_jour_h)
|
||||
day = dt.isoweekday()
|
||||
return (
|
||||
# opening day
|
||||
(day == self.ouverture_sem_d
|
||||
and dt.time() > self.ouverture_sem_h and dt < fermeture_jour_dt)
|
||||
# closing day
|
||||
or (day == self.fermeture_sem_d
|
||||
and dt.time() < self.fermeture_sem_h and dt > ouverture_jour_dt)
|
||||
# regular days
|
||||
or ( day > self.ouverture_sem_d
|
||||
and day < self.fermeture_sem_d
|
||||
and dt > ouverture_jour_dt
|
||||
and dt < fermeture_jour_dt
|
||||
)
|
||||
)
|
||||
|
||||
return False
|
||||
|
||||
|
||||
class AtrealOpenads(BaseResource, HTTPResource, BaseModel):
|
||||
"""API that proxy/relay communications with/to openADS."""
|
||||
|
||||
collectivite = models.CharField(_('Collectivity (identifier)'), max_length=255,
|
||||
help_text=_('ex: Marseille, or ex: 3'), default='', blank=True)
|
||||
default_collectivite_openADS_id = models.PositiveIntegerField(_("Default 'collectivite' (identifier in openADS)"),
|
||||
help_text=_('ex: 3'), default=0, blank=True)
|
||||
openADS_API_url = models.URLField(_('openADS API URL'), max_length=255,
|
||||
help_text=_('ex: https://openads.your_domain.net/api/'), default='')
|
||||
|
||||
openADS_API_timeout = 3600
|
||||
|
||||
category = _('Business Process Connectors')
|
||||
# 'collectivites' will be a property provided by the related_name of the foreignKey
|
||||
# 'forward_files' will be a property provided by the related_name of the foreignKey
|
||||
|
||||
api_description = _('''This API provides exchanges with openADS.''')
|
||||
|
||||
category = _('Business Process Connectors')
|
||||
|
||||
class Meta:
|
||||
verbose_name = _('openADS')
|
||||
verbose_name_plural = _('openADS')
|
||||
ordering = ['openADS_API_url']
|
||||
|
||||
@classmethod
|
||||
def get_class_name_plural(cls, *args, **kwargs):
|
||||
return cls.get_class_name(*args, **kwargs)
|
||||
|
||||
@force_encoded_string_output
|
||||
def __repr__(self):
|
||||
return u'AtrealOpenads(id=%s,openADS=%s,login=%s,collectivites=%s,default=%s)' % (
|
||||
self.id,
|
||||
unicode(self.openADS_API_url),
|
||||
unicode(self.basic_auth_username),
|
||||
self.collectivites.count(),
|
||||
self.default_collectivite_openADS_id)
|
||||
|
||||
def __unicode__(self):
|
||||
return self.slug if isinstance(self.slug, unicode) else unicode(self.slug)
|
||||
|
||||
def get_url_name(self, prefix=''):
|
||||
return '%s%s' % (prefix + '-' if prefix else '', 'connector')
|
||||
|
||||
def get_url_params(self, primary_key=True):
|
||||
params = {'connector': 'atreal-openads'}
|
||||
if primary_key:
|
||||
params['slug'] = self.slug
|
||||
return params
|
||||
|
||||
def get_list_url(self):
|
||||
raise Exception(u"AtrealOpenads:get_list_url() method should not be called")
|
||||
|
||||
def get_collectivite(self, openADS_id):
|
||||
"""Return the 'collectivite' matching an openADS id."""
|
||||
return Collectivite.objects.get(connecteur=self,openADS_id=openADS_id)
|
||||
|
||||
def log_json_payload(self, payload, title='payload', max_str_len=100):
|
||||
"""Log a json paylod surrounded by dashes and with file content filtered."""
|
||||
|
@ -186,7 +399,6 @@ class AtrealOpenads(BaseResource, HTTPResource):
|
|||
self.logger.debug(u"%s", DictDumper(payload, max_str_len))
|
||||
self.logger.debug(u"----- %s (end) -----", title)
|
||||
|
||||
|
||||
def get_files_from_json_payload(self, payload, title='payload'):
|
||||
"""Return files from a JSON payload with all checks and logging."""
|
||||
|
||||
|
@ -215,7 +427,6 @@ class AtrealOpenads(BaseResource, HTTPResource):
|
|||
# return the files
|
||||
return files
|
||||
|
||||
|
||||
def check_file_dict(self, dict_file, title='payload', b64=True):
|
||||
"""Ensure a file dict has all its required items."""
|
||||
|
||||
|
@ -261,7 +472,6 @@ class AtrealOpenads(BaseResource, HTTPResource):
|
|||
# return the first file
|
||||
return first
|
||||
|
||||
|
||||
@endpoint(
|
||||
description=_("Test an openADS 'connexion'")
|
||||
#~ get={
|
||||
|
@ -273,7 +483,7 @@ class AtrealOpenads(BaseResource, HTTPResource):
|
|||
#~ }
|
||||
#~ }
|
||||
)
|
||||
def check_status(self, request=None):
|
||||
def check_status(self, request=None, *args, **kwargs):
|
||||
"""Check avaibility of the openADS.API service."""
|
||||
url = urlparse.urljoin(self.openADS_API_url, '__api__')
|
||||
response = self.requests.get(url)
|
||||
|
@ -282,6 +492,7 @@ class AtrealOpenads(BaseResource, HTTPResource):
|
|||
|
||||
|
||||
@endpoint(
|
||||
perm='can_access',
|
||||
methods=['post'],
|
||||
pattern='^(?P<type_dossier>\w+)/?$',
|
||||
example_pattern='{type_dossier}/',
|
||||
|
@ -290,14 +501,15 @@ class AtrealOpenads(BaseResource, HTTPResource):
|
|||
'collectivite': {
|
||||
'description': _("Use this collectivite (instead of the default one)"),
|
||||
'example_value': '3'
|
||||
}
|
||||
},
|
||||
'now': {'description': _("Datetime (or string formatted to: '%Y-%m-%d %H:%M:%S') against which the 'guichet' is checked for opening"), 'example_value': 'DIA'},
|
||||
},
|
||||
post={'description': _("Create an openADS 'dossier'"),
|
||||
'request_body': {
|
||||
'schema': {
|
||||
'application/json': JSON_SCHEMA_CREATE_DOSSIER_IN
|
||||
}
|
||||
},
|
||||
}
|
||||
#~ 'response_body': {
|
||||
#~ 'schema': {
|
||||
#~ 'application/json': JSON_SCHEMA_CREATE_DOSSIER_OUT
|
||||
|
@ -305,7 +517,7 @@ class AtrealOpenads(BaseResource, HTTPResource):
|
|||
#~ }
|
||||
}
|
||||
)
|
||||
def create_dossier(self, request, type_dossier, collectivite=None):
|
||||
def create_dossier(self, request, type_dossier, collectivite=None, now=None, *args, **kwargs):
|
||||
|
||||
# loads the request body as JSON content
|
||||
json_data = json.loads(request.body)
|
||||
|
@ -313,8 +525,43 @@ class AtrealOpenads(BaseResource, HTTPResource):
|
|||
# log the request body (filtering the files content)
|
||||
self.log_json_payload(json_data, 'request')
|
||||
|
||||
# get the collectivite ID or use the connecteur's default one
|
||||
collectivite_id = collectivite if collectivite else self.default_collectivite_openADS_id
|
||||
|
||||
# get the collectivite instance
|
||||
try:
|
||||
collectivite = self.get_collectivite(collectivite_id)
|
||||
|
||||
# no collectivite instance matching that ID
|
||||
except Collectivite.DoesNotExist:
|
||||
pass
|
||||
|
||||
# a collectivite instance was found
|
||||
else:
|
||||
|
||||
# the collectivite has a guichet
|
||||
if (hasattr(collectivite, 'guichet') and collectivite.guichet):
|
||||
|
||||
# get the datetime against which the 'guichet' is checked for opening
|
||||
now_fmt = '%Y-%m-%d %H:%M:%S'
|
||||
if not now:
|
||||
now = datetime.datetime.now()
|
||||
elif isinstance(now, basestring):
|
||||
now = datetime.datetime.strptime(now, now_fmt)
|
||||
elif not isinstance(now, datetime.datetime):
|
||||
raise APIError(
|
||||
u"Invalid value of type '%s' for now argument of endpoint '%s' "
|
||||
"(must be: %s)" % (
|
||||
type(now),
|
||||
'create_dossier',
|
||||
"datetime or string formatted to '%s'" % now_fmt))
|
||||
|
||||
# if the guichet is not open
|
||||
if not collectivite.guichet.is_open(now):
|
||||
return {'message': _(u"Guichet closed for collectivite '%s'" % collectivite)}
|
||||
|
||||
# build the payload
|
||||
payload = { "collectivite": int(collectivite) if collectivite else int(self.collectivite) }
|
||||
payload = { "collectivite": int(collectivite_id) }
|
||||
|
||||
payload["terrain"] = {
|
||||
"numero_voie": normalize(json_data['fields']['terrain_numero_voie']),
|
||||
|
@ -362,6 +609,9 @@ class AtrealOpenads(BaseResource, HTTPResource):
|
|||
"nom_voie" : normalize(json_data['fields']['%snom_voie' % prefix]),
|
||||
"code_postal": normalize(json_data['fields']['%scode_postal' % prefix]),
|
||||
"localite" : normalize(json_data['fields']['%slocalite' % prefix])
|
||||
},
|
||||
"coordonnees": {
|
||||
"email": normalize(json_data['fields']['%semail' % prefix])
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -404,11 +654,8 @@ class AtrealOpenads(BaseResource, HTTPResource):
|
|||
# set it as an upload
|
||||
upload_file = ContentFile(content)
|
||||
|
||||
# build a hash from the upload
|
||||
file_hash = self.file_digest(upload_file)
|
||||
|
||||
# build a filename (less than 50 chars)
|
||||
filename = file_hash[45:] + '.pdf'
|
||||
# get the file hash
|
||||
file_hash = get_file_digest(upload_file)
|
||||
|
||||
# get the content type if specified
|
||||
if 'content_type' in json_data['fields'][k]:
|
||||
|
@ -419,9 +666,23 @@ class AtrealOpenads(BaseResource, HTTPResource):
|
|||
self.logger.warning("CERFA content type is '%s' instead of '%s'", content_type, 'application/pdf')
|
||||
|
||||
# get the filename if specified
|
||||
filename = None
|
||||
if 'filename' in json_data['fields'][k]:
|
||||
filename = json_data['fields'][k]['filename']
|
||||
|
||||
# define the file extension
|
||||
file_extension = get_file_extension(filename, content_type)
|
||||
|
||||
# filename not specified
|
||||
if not filename:
|
||||
|
||||
# build a filename (less than 50 chars)
|
||||
filename = file_hash[40:] + file_extension
|
||||
|
||||
# update the specified filename with an extension, if none
|
||||
elif '.' not in filename:
|
||||
filename += file_extension
|
||||
|
||||
# set the type fichier based on the key (less than 10 chars)
|
||||
type_fichier = re.sub(r'_.*$', '', k)[:10]
|
||||
|
||||
|
@ -477,7 +738,7 @@ class AtrealOpenads(BaseResource, HTTPResource):
|
|||
|
||||
# decode the recepisse from base 64
|
||||
try:
|
||||
recepisse_content = base64.b64decode(recepisse['b64_content'])
|
||||
base64.b64decode(recepisse['b64_content'])
|
||||
except TypeError:
|
||||
raise APIError('Failed to decode recepisse content from base 64')
|
||||
self.logger.debug("Successfully decoded recepisse from base 64")
|
||||
|
@ -502,6 +763,9 @@ class AtrealOpenads(BaseResource, HTTPResource):
|
|||
for f in files:
|
||||
rand_id = base64.urlsafe_b64encode(os.urandom(6))
|
||||
FF = ForwardFile()
|
||||
FF.connecteur = self
|
||||
if isinstance(collectivite, Collectivite):
|
||||
FF.collectivite = collectivite
|
||||
FF.numero_demande = rand_id
|
||||
FF.numero_dossier = numero_dossier
|
||||
for k in ['type_fichier', 'orig_filename', 'content_type', 'file_hash']:
|
||||
|
@ -519,6 +783,7 @@ class AtrealOpenads(BaseResource, HTTPResource):
|
|||
|
||||
job = self.add_job('upload_user_files',
|
||||
natural_id=numero_dossier,
|
||||
request=None,
|
||||
type_dossier=type_dossier,
|
||||
numero_dossier=numero_dossier,
|
||||
file_ids=file_ids)
|
||||
|
@ -536,11 +801,11 @@ class AtrealOpenads(BaseResource, HTTPResource):
|
|||
'recepisse' : recepisse
|
||||
}
|
||||
|
||||
|
||||
@endpoint(
|
||||
perm='can_access',
|
||||
description=_("Get informations about an openADS 'dossier'"),
|
||||
pattern='^(?P<type_dossier>\w+)/?$',
|
||||
example_pattern='{type_dossier}/',
|
||||
pattern='^(?P<type_dossier>\w+)/(?P<numero_dossier>\w+)/?$',
|
||||
example_pattern='{type_dossier}/{numero_dossier}',
|
||||
parameters={
|
||||
'type_dossier' : {'description': _("Type of 'dossier'") , 'example_value': 'DIA'},
|
||||
'numero_dossier': {'description': _("Identifier for 'dossier'"), 'example_value': 'DIA0130551900001'}
|
||||
|
@ -554,7 +819,7 @@ class AtrealOpenads(BaseResource, HTTPResource):
|
|||
#~ }
|
||||
#~ }
|
||||
)
|
||||
def get_dossier(self, request, type_dossier, numero_dossier):
|
||||
def get_dossier(self, request, type_dossier, numero_dossier, *args, **kwargs):
|
||||
|
||||
# make a request to openADS.API
|
||||
url = urlparse.urljoin(self.openADS_API_url, '/dossier/%s/%s' % (type_dossier, numero_dossier))
|
||||
|
@ -578,27 +843,11 @@ class AtrealOpenads(BaseResource, HTTPResource):
|
|||
# return the response as-is
|
||||
return response.json()
|
||||
|
||||
|
||||
def upload2ForwardFile(self, path, numero_dossier, type_fichier):
|
||||
"""Convert a file path to a ForwardFile."""
|
||||
if path:
|
||||
rand_id = base64.urlsafe_b64encode(os.urandom(6))
|
||||
fwd_file = ForwardFile()
|
||||
fwd_file.numero_demande = rand_id
|
||||
fwd_file.numero_dossier = numero_dossier
|
||||
fwd_file.type_fichier = type_fichier
|
||||
fwd_file.orig_filename = os.path.basename(path)
|
||||
fwd_file.content_type = magic.from_file(path, mime=True)
|
||||
with open(path, 'r') as fp:
|
||||
fwd_file.file_hash = self.file_digest(fp)
|
||||
fwd_file.upload_file = File(open(path, 'r'))
|
||||
fwd_file.upload_status = 'pending'
|
||||
return fwd_file
|
||||
return None
|
||||
|
||||
|
||||
@endpoint(
|
||||
perm='can_access',
|
||||
description=_("Get informations about the forwarding of user files to openADS"),
|
||||
pattern='^(?P<numero_dossier>\w+)/?$',
|
||||
example_pattern='{numero_dossier}/',
|
||||
parameters={
|
||||
'numero_dossier': {'description': _("Identifier for 'dossier'"), 'example_value': 'DIA0130551900001'},
|
||||
'fichier_id' : {'description': _("File identifier") , 'example_value': '78'}
|
||||
|
@ -612,7 +861,7 @@ class AtrealOpenads(BaseResource, HTTPResource):
|
|||
#~ }
|
||||
#~ }
|
||||
)
|
||||
def get_fwd_files(self, request, numero_dossier, fichier_id=None):
|
||||
def get_fwd_files(self, request, numero_dossier, fichier_id=None, *args, **kwargs):
|
||||
payload = []
|
||||
fwd_files = []
|
||||
|
||||
|
@ -651,9 +900,11 @@ class AtrealOpenads(BaseResource, HTTPResource):
|
|||
# return the payload containing the list of files
|
||||
return payload
|
||||
|
||||
|
||||
@endpoint(
|
||||
perm='can_access',
|
||||
description=_("Get informations about the forwarding of a user file to openADS"),
|
||||
pattern='^(?P<numero_dossier>\w+)/?$',
|
||||
example_pattern='{numero_dossier}/',
|
||||
parameters={
|
||||
'numero_dossier': {'description': _("Identifier for 'dossier'"), 'example_value': 'DIA0130551900001'},
|
||||
'fichier_id' : {'description': _("File identifier") , 'example_value': '78'}
|
||||
|
@ -667,7 +918,7 @@ class AtrealOpenads(BaseResource, HTTPResource):
|
|||
#~ }
|
||||
#~ }
|
||||
)
|
||||
def get_fwd_files_status(self, request, numero_dossier, fichier_id=None):
|
||||
def get_fwd_files_status(self, request, numero_dossier, fichier_id=None, *args, **kwargs):
|
||||
|
||||
# get all files matching 'numero_dossier' and 'fichier_id'
|
||||
fwd_files = self.get_fwd_files(request, numero_dossier, fichier_id)
|
||||
|
@ -695,11 +946,11 @@ class AtrealOpenads(BaseResource, HTTPResource):
|
|||
# respond with the payload
|
||||
return payload
|
||||
|
||||
|
||||
@endpoint(
|
||||
perm='can_access',
|
||||
description= _("Get a 'courrier' from an openADS 'dossier'"),
|
||||
pattern='^(?P<type_dossier>\w+)/?$',
|
||||
example_pattern='{type_dossier}/',
|
||||
pattern='^(?P<type_dossier>\w+)/(?P<numero_dossier>\w+)/(?P<lettre_type>\w+)/?$',
|
||||
example_pattern='{type_dossier}/{numero_dossier}/{lettre_type}',
|
||||
parameters={
|
||||
'type_dossier' : {'description': _("Type of 'dossier'") , 'example_value': 'DIA'},
|
||||
'numero_dossier': {'description': _("Identifier for 'dossier'"), 'example_value': 'DIA0130551900001'},
|
||||
|
@ -714,7 +965,7 @@ class AtrealOpenads(BaseResource, HTTPResource):
|
|||
#~ }
|
||||
#~ }
|
||||
)
|
||||
def get_courrier(self, request, type_dossier, numero_dossier, lettre_type):
|
||||
def get_courrier(self, request, type_dossier, numero_dossier, lettre_type, *args, **kwargs):
|
||||
|
||||
# make a request to openADS.API
|
||||
url = urlparse.urljoin(
|
||||
|
@ -742,14 +993,13 @@ class AtrealOpenads(BaseResource, HTTPResource):
|
|||
|
||||
# decode the courrier from base 64
|
||||
try:
|
||||
courrier_content = base64.b64decode(courrier['b64_content'])
|
||||
base64.b64decode(courrier['b64_content'])
|
||||
except TypeError:
|
||||
raise APIError('Failed to decode courrier content from base 64')
|
||||
|
||||
# return the 'courrier' file
|
||||
return {'courrier': courrier}
|
||||
|
||||
|
||||
def get_response_error(self, response):
|
||||
"""Return a error string from an HTTP response."""
|
||||
try:
|
||||
|
@ -780,44 +1030,94 @@ class AtrealOpenads(BaseResource, HTTPResource):
|
|||
detail = clean_spaces(strip_tags(response.content[:1000])) if response.content else ''
|
||||
return u"HTTP error: %s%s" % (response.status_code, ', ' + detail if detail else '')
|
||||
|
||||
|
||||
@endpoint(
|
||||
perm='can_access',
|
||||
description= _("Trigger the uploading of user's files to openADS"),
|
||||
pattern='^(?P<type_dossier>\w+)/(?P<numero_dossier>\w+)/?$',
|
||||
example_pattern='{type_dossier}/{numero_dossier}',
|
||||
parameters={
|
||||
'type_dossier' : {'description': _("Type of 'dossier'") , 'example_value': 'DIA'},
|
||||
'numero_dossier': {'description': _("Identifier for 'dossier'"), 'example_value': 'DIA0130551900001'},
|
||||
'file_ids' : {'description': _("List of ForwardFile IDs to upload (coma separated)"), 'example_value': '12,18'}
|
||||
},
|
||||
#~ get={
|
||||
#~ 'description': _("Trigger the uploading of user's files to openADS"),
|
||||
#~ 'response_body': {
|
||||
#~ 'schema': {
|
||||
#~ 'application/json': JSON_SCHEMA_UPLOAD_USER_FILES
|
||||
#~ }
|
||||
#~ }
|
||||
#~ }
|
||||
)
|
||||
# @raise ForwareFile.DoesNotExist if not found
|
||||
def upload_user_files(self, type_dossier, numero_dossier, file_ids):
|
||||
def upload_user_files(self, request, type_dossier, numero_dossier, file_ids=None, *args, **kwargs):
|
||||
"""A Job to forward user uploaded files to openADS."""
|
||||
|
||||
payload = []
|
||||
fwd_files = []
|
||||
|
||||
if file_ids:
|
||||
|
||||
# if file_ids is a string
|
||||
if isinstance(file_ids, basestring):
|
||||
file_ids = [int(fid) for fid in file_ids.split(',')]
|
||||
|
||||
# invalid input
|
||||
elif not isinstance(file_ids, list):
|
||||
raise TypeError(
|
||||
"Invalid 'file_ids' argument type '%s' "
|
||||
"(must be string or list)" % type(file_ids))
|
||||
|
||||
# a list of ForwardFile IDs was specified
|
||||
if file_ids:
|
||||
fwd_files = ForwardFile.objects.filter(id__in=file_ids).all()
|
||||
|
||||
# check that all ids where found
|
||||
fwd_files_ids = set([ff.id for ff in fwd_files])
|
||||
file_ids_diff = [item for item in file_ids if item not in fwd_files_ids]
|
||||
if file_ids_diff:
|
||||
raise ForwardFile.DoesNotExist(
|
||||
"The following ForwardFile IDs were not found: %s." % file_ids_diff)
|
||||
|
||||
# filter out files not in status 'pending'
|
||||
fwd_files_filtered = fwd_files.filter(upload_status='pending').all()
|
||||
fwd_filtered_ids = set([ff.id for ff in fwd_files_filtered])
|
||||
file_ids_diff = [item for item in file_ids if item not in fwd_filtered_ids]
|
||||
if file_ids_diff:
|
||||
self.logger.warning(
|
||||
"The following ForwardFile IDs were not in status '%s' "
|
||||
"when asked specificaly to upload them: %s." % ('pending', file_ids_diff))
|
||||
fwd_files = fwd_files_filtered
|
||||
|
||||
# no files_ids where specified
|
||||
else:
|
||||
|
||||
# process all ForwardFiles of the 'dossier' (in status 'pending')
|
||||
fwd_files = ForwardFile.objects.filter(
|
||||
numero_dossier=numero_dossier,
|
||||
upload_status='pending'
|
||||
).all()
|
||||
|
||||
# for every file ids specified (in parameters of this job)
|
||||
for fid in file_ids:
|
||||
self.logger.debug(u"upload_user_files() ForwardFile file_id: %s", fid)
|
||||
for fwd_file in fwd_files:
|
||||
self.logger.debug(u"upload_user_files() ForwardFile file_id: %s", fwd_file.id)
|
||||
|
||||
# get the matching forward file
|
||||
fwd_file = ForwardFile.objects.get(id=fid)
|
||||
# add the file content and data to the payload
|
||||
payload.append({
|
||||
'filename' : fwd_file.orig_filename + ('.pdf' if fwd_file.orig_filename[-4:] != '.pdf' else ''),
|
||||
'content_type' : fwd_file.content_type,
|
||||
'b64_content' : base64.b64encode(fwd_file.upload_file.read()),
|
||||
'file_type' : fwd_file.type_fichier
|
||||
})
|
||||
self.logger.debug("upload_user_files() payload added")
|
||||
|
||||
# found one
|
||||
if fwd_file:
|
||||
self.logger.debug("upload_user_files() got ForwardFile")
|
||||
|
||||
# add the file content and data to the payload
|
||||
payload.append({
|
||||
'filename' : fwd_file.orig_filename + ('.pdf' if fwd_file.orig_filename[-4:] != '.pdf' else ''),
|
||||
'content_type' : fwd_file.content_type,
|
||||
'b64_content' : base64.b64encode(fwd_file.upload_file.read()),
|
||||
'file_type' : fwd_file.type_fichier
|
||||
})
|
||||
self.logger.debug("upload_user_files() payload added")
|
||||
|
||||
# update the file upload data (status and attempts)
|
||||
fwd_file.upload_status = 'uploading'
|
||||
fwd_file.upload_attempt += 1
|
||||
fwd_file.upload_msg = 'attempt %s' % fwd_file.upload_attempt
|
||||
self.logger.debug(u"upload_user_files() upload_msg: '%s'", fwd_file.upload_msg)
|
||||
fwd_file.save()
|
||||
self.logger.debug("upload_user_files() ForwardFile saved")
|
||||
|
||||
# append the forwarded file to the list
|
||||
fwd_files.append(fwd_file)
|
||||
# update the file upload data (status and attempts)
|
||||
fwd_file.upload_status = 'uploading'
|
||||
fwd_file.upload_attempt += 1
|
||||
fwd_file.upload_msg = 'attempt %s' % fwd_file.upload_attempt
|
||||
self.logger.debug(u"upload_user_files() upload_msg: '%s'", fwd_file.upload_msg)
|
||||
fwd_file.save()
|
||||
self.logger.debug("upload_user_files() ForwardFile saved")
|
||||
|
||||
# if files need to be forwarded
|
||||
if payload:
|
||||
|
@ -837,6 +1137,8 @@ class AtrealOpenads(BaseResource, HTTPResource):
|
|||
|
||||
# reponse is an error
|
||||
if response.status_code // 100 != 2:
|
||||
error = self.get_response_error(response)
|
||||
self.logger.warning(u"Request [POST] '%s' failed with error: '%s'", url, error)
|
||||
|
||||
# update every files status as 'failed' and save the error message
|
||||
for fwd_file in fwd_files:
|
||||
|
@ -852,12 +1154,16 @@ class AtrealOpenads(BaseResource, HTTPResource):
|
|||
file_ids
|
||||
)
|
||||
|
||||
# respond with APIError
|
||||
if request:
|
||||
raise APIError(error)
|
||||
|
||||
# response is not an error
|
||||
else:
|
||||
|
||||
# load the reponse as JSON
|
||||
try:
|
||||
result = response.json()
|
||||
response.json()
|
||||
|
||||
# in case of failure
|
||||
except ValueError:
|
||||
|
@ -875,6 +1181,10 @@ class AtrealOpenads(BaseResource, HTTPResource):
|
|||
fwd_files
|
||||
)
|
||||
|
||||
# respond with APIError
|
||||
if request:
|
||||
raise APIError(u'No JSON content returned: %r' % response.content[:1000])
|
||||
|
||||
# response correctly loaded as JSON
|
||||
else:
|
||||
|
||||
|
@ -885,20 +1195,21 @@ class AtrealOpenads(BaseResource, HTTPResource):
|
|||
fwd_file.upload_status = 'success'
|
||||
fwd_file.upload_msg = 'uploaded successfuly'
|
||||
|
||||
# delete file content (on success)
|
||||
# save the file (content will be deleted automatically)
|
||||
fpath = fwd_file.upload_file.path
|
||||
fwd_file.upload_file.delete()
|
||||
|
||||
# save the file
|
||||
fwd_file.save()
|
||||
|
||||
# log the success message
|
||||
self.logger.debug(
|
||||
u"upload_user_files() flaging file '%s' has transfered (deleted '%s')",
|
||||
u"upload_user_files() flaging file '%s' as 'transfered' (deleted '%s')",
|
||||
fwd_file.id,
|
||||
fpath
|
||||
)
|
||||
|
||||
# respond with success
|
||||
if request:
|
||||
return {'message': 'all files transfered successfully'}
|
||||
|
||||
# no file need to be forwarded
|
||||
else:
|
||||
self.logger.warning(
|
||||
|
@ -907,15 +1218,6 @@ class AtrealOpenads(BaseResource, HTTPResource):
|
|||
file_ids
|
||||
)
|
||||
|
||||
|
||||
# copy-pasted from 'wcs/qommon/misc.py'
|
||||
def file_digest(self, content, chunk_size=100000):
|
||||
"""Return a hash for the content specified."""
|
||||
digest = hashlib.sha256()
|
||||
content.seek(0)
|
||||
def read_chunk():
|
||||
return content.read(chunk_size)
|
||||
for chunk in iter(read_chunk, ''):
|
||||
digest.update(chunk)
|
||||
return digest.hexdigest()
|
||||
|
||||
# respond with message
|
||||
if request:
|
||||
return {'message': 'no file to transfer'}
|
||||
|
|
|
@ -0,0 +1,9 @@
|
|||
/* overrides for atreal_openads */
|
||||
|
||||
div#header h1.breadcrumbs a:last-child {
|
||||
display: inline-block;
|
||||
}
|
||||
|
||||
.connecteur-collectivites .collectivite .service-urbanisme {
|
||||
margin-left: 0.1em;
|
||||
}
|
|
@ -0,0 +1,33 @@
|
|||
{% extends "passerelle/manage.html" %}
|
||||
{% load i18n passerelle gadjo staticfiles %}
|
||||
|
||||
{% block extrascripts %}
|
||||
{{ block.super }}
|
||||
<link rel="stylesheet" type="text/css" media="all" href="{% static "css/atreal_openads.css" %}?{% start_timestamp %}"/>
|
||||
{% endblock %}
|
||||
|
||||
{% block breadcrumb %}
|
||||
{{ block.super }}
|
||||
{% if connecteur %}<a href="{{connecteur.get_absolute_url}}">{{ connecteur.slug }}</a>{% endif %}
|
||||
{% endblock %}
|
||||
|
||||
{% block appbar %}
|
||||
<h2>{{ view.model.get_verbose_name }} - {% if object.id %}{{ object.connecteur }} - {{ object.name }}{% else %}{% trans 'New' %}{% endif %}</h2>
|
||||
{% endblock %}
|
||||
|
||||
{% block content %}
|
||||
|
||||
<form method="post" enctype="multipart/form-data">
|
||||
<div id="form-content">
|
||||
{% csrf_token %}
|
||||
{{ form|with_template }}
|
||||
</div>
|
||||
{% block buttons %}
|
||||
<div class="buttons">
|
||||
<button class="submit-button">{% trans "Save" %}</button>
|
||||
<button class="cancel">{% trans "Cancel" %}</button>
|
||||
</div>
|
||||
{% endblock %}
|
||||
</form>
|
||||
|
||||
{% endblock %}
|
|
@ -0,0 +1,99 @@
|
|||
{% extends "passerelle/manage.html" %}
|
||||
{% load i18n passerelle gadjo staticfiles %}
|
||||
|
||||
{% block extrascripts %}
|
||||
{{ block.super }}
|
||||
<link rel="stylesheet" type="text/css" media="all" href="{% static "css/atreal_openads.css" %}?{% start_timestamp %}"/>
|
||||
{% endblock %}
|
||||
|
||||
{% block breadcrumb %}
|
||||
{{ block.super }}
|
||||
{% if connecteur %}<a href="{{connecteur.get_absolute_url}}">{{ connecteur.slug }}</a>{% endif %}
|
||||
{% endblock %}
|
||||
|
||||
{% block appbar %}
|
||||
<h2>{{ view.model.get_verbose_name_plural }}</h2>
|
||||
<span class="actions">
|
||||
{% if collectivite_add_url %}
|
||||
<a rel="popup" href="{{ collectivite_add_url }}?back-to=list-collectivites">{% trans "Add a Collectivite" %}</a>
|
||||
{% endif %}
|
||||
</span>
|
||||
{% endblock %}
|
||||
|
||||
{% block content %}
|
||||
|
||||
<table id="list-collectivite">
|
||||
{% block list %}
|
||||
{% if object_list %}
|
||||
<thead>
|
||||
<tr>
|
||||
<th class="actions">{% trans "Actions" %}</th>
|
||||
{% for field in view.model.get_fields %}
|
||||
{% if field.name != "connecteur" %}
|
||||
<th class="{{field.name}}">
|
||||
{% if field.name == "guichet" %}{% trans "Guichet" %}
|
||||
{% elif field.name == "forward_file" %}{% trans "Forward Files" %}
|
||||
{% else %}{% trans field.verbose_name %}
|
||||
{% endif %}
|
||||
</th>
|
||||
{% endif %}
|
||||
{% endfor %}
|
||||
</tr>
|
||||
</thead>
|
||||
<tbody>
|
||||
{% for col in object_list %}
|
||||
<tr>
|
||||
<td class="actions">
|
||||
<span class="edit-link" ><a rel="popup" href="{{col.get_edit_url}}?back-to=list-collectivites" >{% trans "edit" %}</a></span>
|
||||
<span class="delete-link"><a rel="popup" href="{{col.get_delete_url}}?back-to=list-collectivites">{% trans "delete" %}</a></span>
|
||||
</td>
|
||||
{% for field, value in col.get_fields_kv %}
|
||||
{% if field.name != "connecteur" %}
|
||||
<td class="{{field.name}}">
|
||||
{% if field.name != "guichet" and field.name != "name" and field.name != "forward_file" %}
|
||||
{{value}}
|
||||
{% elif field.name == "name" %}
|
||||
<a href="{{col.get_absolute_url}}">{{value}}</a>
|
||||
{% elif field.name == "guichet" %}
|
||||
{% if value %}<a href="{{value.get_absolute_url}}">{{value}}</a>{% else %}{% trans "None" %}{% endif %}
|
||||
{% elif field.name == "forward_file" %}
|
||||
{% if value %}<a href="{{ col.get_absolute_url }}/list-forward-files">{{ value.count }} {% trans "forward file(s)" %}</a>{% else %}{% trans "None" %}{% endif %}
|
||||
{% endif %}
|
||||
</td>
|
||||
{% endif %}
|
||||
{% endfor %}
|
||||
</tr>
|
||||
{% endfor %}
|
||||
</tbody>
|
||||
{% if is_paginated %}
|
||||
<tfoot>
|
||||
<tr>
|
||||
<td class="pagination" colspan="{{ view.model.get_fields|length }}">
|
||||
<span class="step-links">
|
||||
{% if page_obj.has_previous %}
|
||||
<a href="?page=1">« {% trans "first" %}</a>
|
||||
<a href="?page={{ contacts.previous_page_number }}">{% trans "previous" %}</a>
|
||||
{% endif %}
|
||||
|
||||
<span class="current">
|
||||
{% trans "Page" %} {{ page_obj.number }} / {{ page_obj.paginator.num_pages }}
|
||||
</span>
|
||||
|
||||
{% if page_obj.has_next %}
|
||||
<a href="?page={{ page_obj.next_page_number }}">{% trans "next" %}</a>
|
||||
<a href="?page={{ page_obj.paginator.num_pages }}">{% trans "last" %} »</a>
|
||||
{% endif %}
|
||||
</span>
|
||||
<td>
|
||||
</tr>
|
||||
</tfoot>
|
||||
{% endif %}
|
||||
{% else %}
|
||||
<tr class="nodata">
|
||||
<td>{% trans "No data" %}</td>
|
||||
</tr>
|
||||
{% endif %}
|
||||
{% endblock %}
|
||||
</table>
|
||||
|
||||
{% endblock %}
|
|
@ -0,0 +1,63 @@
|
|||
{% extends "passerelle/manage.html" %}
|
||||
{% load i18n passerelle gadjo staticfiles %}
|
||||
|
||||
{% block extrascripts %}
|
||||
{{ block.super }}
|
||||
<link rel="stylesheet" type="text/css" media="all" href="{% static "css/atreal_openads.css" %}?{% start_timestamp %}"/>
|
||||
{% endblock %}
|
||||
|
||||
{% block breadcrumb %}
|
||||
{{ block.super }}
|
||||
{% if connecteur %}<a href="{{connecteur.get_absolute_url}}">{{ connecteur.slug }}</a>{% endif %}
|
||||
{% endblock %}
|
||||
|
||||
{% block appbar %}
|
||||
<h2>{{ view.model.get_verbose_name }}{% if object.name %} - {{ object.name }}{% endif %}</h2>
|
||||
<span class="actions">
|
||||
{% if object|can_delete:request.user %}
|
||||
<a rel="popup" href="{{ object.get_delete_url }}">{% trans "delete" %}</a>
|
||||
{% endif %}
|
||||
{% if object|can_edit:request.user %}
|
||||
<a rel="popup" href="{{ object.get_edit_url }}">{% trans "edit" %}</a>
|
||||
{% endif %}
|
||||
{% if not object.guichet and object|can_edit:request.user and guichet_add_url %}
|
||||
<a rel="popup" href="{{ guichet_add_url }}">{% trans "Add a Guichet" %}</a>
|
||||
{% endif %}
|
||||
</span>
|
||||
{% endblock %}
|
||||
|
||||
{% block content %}
|
||||
|
||||
<div id="description">
|
||||
{% block description %}
|
||||
|
||||
{% for field, value in object.get_fields_kv %}
|
||||
<p class="{{field.name}}">
|
||||
{% if field.name == "guichet" %}
|
||||
{% trans "Guichet" %} {% trans ":" %}
|
||||
{% if value %}
|
||||
<a href="{{value.get_absolute_url}}">{{value}}</a>
|
||||
{% else %}
|
||||
{% trans "None" %}
|
||||
{% endif %}
|
||||
{% elif field.name != "connecteur" and field.name != "forward_file" and field.name != "id" and field.name != "name" %}
|
||||
{% trans field.verbose_name %} {% trans ":" %} {{value}}
|
||||
{% endif %}
|
||||
</p>
|
||||
{% endfor %}
|
||||
{% if object.forward_files.count %}
|
||||
<p class="title">
|
||||
{% trans "Forward files" %}
|
||||
{% trans ":" %}
|
||||
<a href="{% if forward_files_list_url %}{{forward_files_list_url}}{% else %}{{ object.forward_files.first.get_list_url }}{% endif %}">
|
||||
{{ object.forward_files.count }}
|
||||
{% if object.forward_files.count == 1 %}{% trans "forward file" %}
|
||||
{% else %}{% trans "forward files" %}
|
||||
{% endif %}
|
||||
</a>
|
||||
</p>
|
||||
{% endif %}
|
||||
{% endblock %}
|
||||
</div>
|
||||
|
||||
{% endblock %}
|
|
@ -0,0 +1,77 @@
|
|||
{% extends "passerelle/manage/service_view.html" %}
|
||||
{% load i18n passerelle gadjo staticfiles %}
|
||||
|
||||
{% block extrascripts %}
|
||||
{{ block.super }}
|
||||
<link rel="stylesheet" type="text/css" media="all" href="{% static "css/atreal_openads.css" %}?{% start_timestamp %}"/>
|
||||
{% endblock %}
|
||||
|
||||
{% block appbar %}
|
||||
<h2>{{ view.model.get_verbose_name }} - {{ object.title }}
|
||||
{% with status=object.get_availability_status %}
|
||||
{% if status %}
|
||||
{% if status.down %}<span class="down" title="{{status.message}} {% trans 'since:' %} {{status.start_timestamp|date:"SHORT_DATETIME_FORMAT"}} ">{% trans 'Down' %}</span>{% endif %}
|
||||
{% endif %}
|
||||
{% endwith %}
|
||||
</h2>
|
||||
<span class="actions">
|
||||
{% if object|can_edit:request.user and has_check_status %}
|
||||
<a rel="popup" href="{% url 'manage-availability' resource_type=object|resource_type resource_pk=object.id %}">{% trans 'availability check parameters' %}</a>
|
||||
{% endif %}
|
||||
{% if object|can_edit:request.user %}
|
||||
<a rel="popup" href="{% url 'logging-parameters' resource_type=object|resource_type resource_pk=object.id %}">{% trans 'logging parameters' %}</a>
|
||||
{% endif %}
|
||||
{% if object|can_delete:request.user %}
|
||||
<a rel="popup" href="{{ object.get_delete_url }}">{% trans 'delete' %}</a>
|
||||
{% endif %}
|
||||
{% if object|can_edit:request.user %}
|
||||
<a rel="popup" href="{{ object.get_edit_url }}">{% trans 'edit' %}</a>
|
||||
{% endif %}
|
||||
{% if object|can_edit:request.user and collectivite_add_url%}
|
||||
<a rel="popup" href="{{ collectivite_add_url }}">{% trans "Add a collectivite" %}</a>
|
||||
{% endif %}
|
||||
</span>
|
||||
{% endblock %}
|
||||
|
||||
{% block description %}
|
||||
{{ block.super }}
|
||||
{% if object.id %}
|
||||
{% if object.collectivites.count %}
|
||||
{% if object.collectivites.count == 1 %}
|
||||
<p class="title">
|
||||
{% trans 'Collectivites' %}
|
||||
{% trans ":" %}
|
||||
<a href="{{ object.collectivites.first.get_absolute_url }}">{{ object.collectivites.first }}</a>
|
||||
</p>
|
||||
{% elif object.collectivites.count < 10 %}
|
||||
<p class="title">{% trans 'Collectivites' %}</p>
|
||||
<ul class="connecteur-collectivites">
|
||||
{% for col in object.collectivites.all %}
|
||||
<li class="collectivite"><a href="{{ col.get_absolute_url }}">{{ col.name }}</a>
|
||||
{% if col.guichet %}<span class="guichet"> ( {{ col.guichet }} ) </span>{% endif %}
|
||||
</li>
|
||||
{% endfor %}
|
||||
</ul>
|
||||
{% else %}
|
||||
<p class="title">
|
||||
{% trans 'Collectivites' %}
|
||||
{% trans ":" %}
|
||||
<a href="{{ object.collectivites.first.get_list_url }}">{{ object.collectivites.count }} {% trans 'collectivites' %}</a>
|
||||
</p>
|
||||
{% endif %}
|
||||
{% endif %}
|
||||
{% if object.forward_files.count %}
|
||||
<p class="title">
|
||||
{% trans "Forward files" %}
|
||||
{% trans ":" %}
|
||||
<a href="{{ object.forward_files.first.get_list_url }}">
|
||||
{{ object.forward_files.count }}
|
||||
{% if object.forward_files.count == 1 %}{% trans "forward file" %}
|
||||
{% else %}{% trans "forward files" %}
|
||||
{% endif %}
|
||||
</a>
|
||||
</p>
|
||||
{% endif %}
|
||||
{% endif %}
|
||||
{% endblock %}
|
||||
|
|
@ -0,0 +1,34 @@
|
|||
{% extends "passerelle/manage.html" %}
|
||||
{% load i18n passerelle gadjo staticfiles %}
|
||||
|
||||
{% block extrascripts %}
|
||||
{{ block.super }}
|
||||
<link rel="stylesheet" type="text/css" media="all" href="{% static "css/atreal_openads.css" %}?{% start_timestamp %}"/>
|
||||
{% endblock %}
|
||||
|
||||
{% block breadcrumb %}
|
||||
{{ block.super }}
|
||||
{% if connecteur %}<a href="{{connecteur.get_absolute_url}}">{{ connecteur.slug }}</a>{% endif %}
|
||||
{% if collectivite %}<a href="{{collectivite.get_absolute_url}}">{{ collectivite.name }}</a>{% endif %}
|
||||
{% endblock %}
|
||||
|
||||
{% block appbar %}
|
||||
<h2>{{ view.model.get_verbose_name }} - {% if object.id %}{{ object.numero_dossier }} - {{ object.type_fichier }}{% endif %}</h2>
|
||||
{% endblock %}
|
||||
|
||||
{% block content %}
|
||||
|
||||
<form method="post" enctype="multipart/form-data">
|
||||
<div id="form-content">
|
||||
{% csrf_token %}
|
||||
{{ form|with_template }}
|
||||
</div>
|
||||
{% block buttons %}
|
||||
<div class="buttons">
|
||||
<button class="submit-button">{% trans "Save" %}</button>
|
||||
<button class="cancel">{% trans "Cancel" %}</button>
|
||||
</div>
|
||||
{% endblock %}
|
||||
</form>
|
||||
|
||||
{% endblock %}
|
|
@ -0,0 +1,92 @@
|
|||
{% extends "passerelle/manage.html" %}
|
||||
{% load i18n passerelle gadjo staticfiles %}
|
||||
|
||||
{% block extrascripts %}
|
||||
{{ block.super }}
|
||||
<link rel="stylesheet" type="text/css" media="all" href="{% static "css/atreal_openads.css" %}?{% start_timestamp %}"/>
|
||||
{% endblock %}
|
||||
|
||||
{% block breadcrumb %}
|
||||
{{ block.super }}
|
||||
{% if connecteur %}<a href="{{connecteur.get_absolute_url}}">{{ connecteur.slug }}</a>{% endif %}
|
||||
{% if collectivite %}<a href="{{collectivite.get_absolute_url}}">{{ collectivite.name }}</a>{% endif %}
|
||||
{% endblock %}
|
||||
|
||||
{% block appbar %}
|
||||
<h2>{{ view.model.get_verbose_name }} - {% trans 'list' %}</h2>
|
||||
{% endblock %}
|
||||
|
||||
{% block content %}
|
||||
|
||||
<table id="list-forward-files">
|
||||
{% block list %}
|
||||
{% if object_list %}
|
||||
<thead>
|
||||
<tr>
|
||||
<th class="actions">{% trans "Actions" %}</th>
|
||||
{% for field in view.model.get_fields %}
|
||||
{% if field.name != "connecteur" and field.name != "file_hash" and field.name != "orig_filename" %}
|
||||
<th class="{{field.name}}">{% trans field.verbose_name %}</th>
|
||||
{% endif %}
|
||||
{% endfor %}
|
||||
</tr>
|
||||
</thead>
|
||||
<tbody>
|
||||
{% for ff in object_list %}
|
||||
<tr>
|
||||
<td class="actions">
|
||||
<span class="edit-link"><a rel="popup" href="{{ff.get_edit_url}}?back-to={% if collectivite %}col-{% endif %}list-forward-files" >{% trans "edit" %}</a></span>
|
||||
<span class="delete-link"><a rel="popup" href="{{ff.get_delete_url}}?back-to={% if collectivite %}col-{% endif %}list-forward-files">{% trans "delete" %}</a></span>
|
||||
</td>
|
||||
{% for field, value in ff.get_fields_kv %}
|
||||
{% if field.name != "connecteur" and field.name != "file_hash" and field.name != "orig_filename" %}
|
||||
<td class="{{field.name}}">
|
||||
{% if field.name != "collectivite" and field.name != "upload_file" %}
|
||||
{{value}}
|
||||
{% elif field.name == "collectivite" %}
|
||||
{% if value %}<a href="{{value.get_absolute_url}}">{{value}}</a>
|
||||
{% else %}{% trans "None" %}
|
||||
{% endif %}
|
||||
{% elif field.name == "upload_file" %}
|
||||
{% if value %}<a href="{{value.url}}">{{value}}</a>
|
||||
{% else %}{% trans "None" %}
|
||||
{% endif %}
|
||||
{% endif %}
|
||||
</td>
|
||||
{% endif %}
|
||||
{% endfor %}
|
||||
</tr>
|
||||
{% endfor %}
|
||||
</tbody>
|
||||
{% if is_paginated %}
|
||||
<tfoot>
|
||||
<tr>
|
||||
<td class="pagination" colspan="{{ view.model.get_fields|length }}">
|
||||
<span class="step-links">
|
||||
{% if page_obj.has_previous %}
|
||||
<a href="?page=1">« {% trans "first" %}</a>
|
||||
<a href="?page={{ contacts.previous_page_number }}">{% trans "previous" %}</a>
|
||||
{% endif %}
|
||||
|
||||
<span class="current">
|
||||
{% trans "Page" %} {{ page_obj.number }} / {{ page_obj.paginator.num_pages }}
|
||||
</span>
|
||||
|
||||
{% if page_obj.has_next %}
|
||||
<a href="?page={{ page_obj.next_page_number }}">{% trans "next" %}</a>
|
||||
<a href="?page={{ page_obj.paginator.num_pages }}">{% trans "last" %} »</a>
|
||||
{% endif %}
|
||||
</span>
|
||||
<td>
|
||||
</tr>
|
||||
</tfoot>
|
||||
{% endif %}
|
||||
{% else %}
|
||||
<tr class="nodata">
|
||||
<td>{% trans "No data" %}</td>
|
||||
</tr>
|
||||
{% endif %}
|
||||
{% endblock %}
|
||||
</table>
|
||||
|
||||
{% endblock %}
|
|
@ -0,0 +1,56 @@
|
|||
{% extends "passerelle/manage.html" %}
|
||||
{% load i18n passerelle gadjo staticfiles %}
|
||||
|
||||
{% block extrascripts %}
|
||||
{{ block.super }}
|
||||
<link rel="stylesheet" type="text/css" media="all" href="{% static "css/atreal_openads.css" %}?{% start_timestamp %}"/>
|
||||
{% endblock %}
|
||||
|
||||
{% block breadcrumb %}
|
||||
{{ block.super }}
|
||||
{% if connecteur %}<a href="{{connecteur.get_absolute_url}}">{{ connecteur.slug }}</a>{% endif %}
|
||||
{% if collectivite %}<a href="{{collectivite.get_absolute_url}}">{{ collectivite.name }}</a>{% endif %}
|
||||
{% endblock %}
|
||||
|
||||
{% block appbar %}
|
||||
<h2>{{ view.model.get_verbose_name }} - {% if object.id %}{{ object.numero_dossier }} - {{ object.type_fichier }}{% endif %}</h2>
|
||||
<span class="actions">
|
||||
{% if object|can_delete:request.user %}
|
||||
<a rel="popup" href="{{ object.get_delete_url }}">{% trans "delete" %}</a>
|
||||
{% endif %}
|
||||
{% if object|can_edit:request.user %}
|
||||
<a rel="popup" href="{{ object.get_edit_url }}">{% trans "edit" %}</a>
|
||||
{% endif %}
|
||||
</span>
|
||||
{% endblock %}
|
||||
|
||||
{% block content %}
|
||||
|
||||
<div id="description">
|
||||
{% block description %}
|
||||
|
||||
{% for field, value in object.get_fields_kv %}
|
||||
<p class="{{field.name}}">
|
||||
{% if field.name == "collectivite" %}
|
||||
{% trans "Collectivite" %} {% trans ":" %}
|
||||
{% if value %}
|
||||
<a href="{{value.get_absolute_url}}">{{value}}</a>
|
||||
{% else %}
|
||||
{% trans "None" %}
|
||||
{% endif %}
|
||||
{% elif field.name == "upload_file" %}
|
||||
{% trans "File" %} {% trans ":" %}
|
||||
{% if value %}
|
||||
<a href="{{value.url}}">{{value}}</a>
|
||||
{% else %}
|
||||
{% trans "None" %}
|
||||
{% endif %}
|
||||
{% elif field.name != "connecteur" and field.name != "id" %}
|
||||
{% trans field.verbose_name %} {% trans ":" %} {{value}}
|
||||
{% endif %}
|
||||
</p>
|
||||
{% endfor %}
|
||||
{% endblock %}
|
||||
</div>
|
||||
|
||||
{% endblock %}
|
|
@ -0,0 +1,34 @@
|
|||
{% extends "passerelle/manage.html" %}
|
||||
{% load i18n passerelle gadjo staticfiles %}
|
||||
|
||||
{% block extrascripts %}
|
||||
{{ block.super }}
|
||||
<link rel="stylesheet" type="text/css" media="all" href="{% static "css/atreal_openads.css" %}?{% start_timestamp %}"/>
|
||||
{% endblock %}
|
||||
|
||||
{% block breadcrumb %}
|
||||
{{ block.super }}
|
||||
{% if connecteur %}<a href="{{connecteur.get_absolute_url}}">{{ connecteur.slug }}</a>{% endif %}
|
||||
{% if collectivite %}<a href="{{collectivite.get_absolute_url}}">{{ collectivite.name }}</a>{% endif %}
|
||||
{% endblock %}
|
||||
|
||||
{% block appbar %}
|
||||
<h2>{{ view.model.get_verbose_name }} {% if object.id and object.collectivite %} - {{ object.collectivite }}{% else %}{% trans 'New' %}{% endif %}</h2>
|
||||
{% endblock %}
|
||||
|
||||
{% block content %}
|
||||
|
||||
<form method="post" enctype="multipart/form-data">
|
||||
<div id="form-content">
|
||||
{% csrf_token %}
|
||||
{{ form|with_template }}
|
||||
</div>
|
||||
{% block buttons %}
|
||||
<div class="buttons">
|
||||
<button class="submit-button">{% trans "Save" %}</button>
|
||||
<button class="cancel">{% trans "Cancel" %}</button>
|
||||
</div>
|
||||
{% endblock %}
|
||||
</form>
|
||||
|
||||
{% endblock %}
|
|
@ -0,0 +1,44 @@
|
|||
{% extends "passerelle/manage.html" %}
|
||||
{% load i18n passerelle gadjo staticfiles %}
|
||||
|
||||
{% block extrascripts %}
|
||||
{{ block.super }}
|
||||
<link rel="stylesheet" type="text/css" media="all" href="{% static "css/atreal_openads.css" %}?{% start_timestamp %}"/>
|
||||
{% endblock %}
|
||||
|
||||
{% block breadcrumb %}
|
||||
{{ block.super }}
|
||||
{% if connecteur %}<a href="{{connecteur.get_absolute_url}}">{{ connecteur.slug }}</a>{% endif %}
|
||||
{% if collectivite %}<a href="{{collectivite.get_absolute_url}}">{{ collectivite.name }}</a>{% endif %}
|
||||
{% endblock %}
|
||||
|
||||
{% block appbar %}
|
||||
<h2>{{ view.model.get_verbose_name }}</h2>
|
||||
<span class="actions">
|
||||
{% if object|can_delete:request.user %}
|
||||
<a rel="popup" href="{{ object.get_delete_url }}">{% trans "delete" %}</a>
|
||||
{% endif %}
|
||||
{% if object|can_edit:request.user %}
|
||||
<a rel="popup" href="{{ object.get_edit_url }}">{% trans "edit" %}</a>
|
||||
{% endif %}
|
||||
</span>
|
||||
{% endblock %}
|
||||
|
||||
{% block content %}
|
||||
|
||||
<div id="description">
|
||||
{% block description %}
|
||||
|
||||
{% for field, value in object.get_fields_kv %}
|
||||
{% if field.name != "id" %}
|
||||
<p class="{{field.name}}">
|
||||
{% if field.name != "collectivite" %}
|
||||
{% trans field.verbose_name %} {% trans ":" %} {{value}}
|
||||
{% endif %}
|
||||
</p>
|
||||
{% endif %}
|
||||
{% endfor %}
|
||||
{% endblock %}
|
||||
</div>
|
||||
|
||||
{% endblock %}
|
|
@ -0,0 +1,91 @@
|
|||
#!/usr/bin/env python
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import re
|
||||
|
||||
from django.conf.urls import url
|
||||
|
||||
from .views import (
|
||||
AtrealOpenadsView,
|
||||
ForwardFileView,
|
||||
ForwardFileListView,
|
||||
ForwardFileUpdateView,
|
||||
ForwardFileDeleteView,
|
||||
CollectiviteView,
|
||||
CollectiviteListView,
|
||||
CollectiviteCreateView,
|
||||
CollectiviteUpdateView,
|
||||
CollectiviteDeleteView,
|
||||
GuichetView,
|
||||
GuichetCreateView,
|
||||
GuichetUpdateView,
|
||||
GuichetDeleteView
|
||||
)
|
||||
|
||||
|
||||
urlpatterns = [
|
||||
url(r'^(?P<slug>[\w,-]+)/$', AtrealOpenadsView.as_view(), name='view-connector')
|
||||
]
|
||||
|
||||
management_urlpatterns = []
|
||||
|
||||
for view in [
|
||||
ForwardFileView,
|
||||
ForwardFileListView,
|
||||
ForwardFileUpdateView,
|
||||
ForwardFileDeleteView,
|
||||
CollectiviteView,
|
||||
CollectiviteListView,
|
||||
CollectiviteCreateView,
|
||||
CollectiviteUpdateView,
|
||||
CollectiviteDeleteView,
|
||||
GuichetView,
|
||||
GuichetCreateView,
|
||||
GuichetUpdateView,
|
||||
GuichetDeleteView
|
||||
]:
|
||||
view_class_name = str(view.__name__)
|
||||
m = re.search(r'^.*(Create|Update|Delete|List)View$', view_class_name)
|
||||
if m:
|
||||
view_action = m.group(1).lower()
|
||||
else:
|
||||
view_action = 'view'
|
||||
|
||||
# no prefix for action 'view'
|
||||
url_prefix = view_action.replace('update', 'edit') + '-'
|
||||
|
||||
regex_base = r'^(?P<connecteur>[\w,-]+)/'
|
||||
regex_pkey = '/(?P<pk>[\w,-]+)'
|
||||
|
||||
url_name = url_prefix + view.model.get_class_name_dash_case()
|
||||
regex_url = '%s%s' % (url_prefix if view_action != 'view' else '',
|
||||
view.model.get_class_name_dash_case())
|
||||
|
||||
# no primary key for action 'create' and 'list'
|
||||
if view_action in ['create', 'list']:
|
||||
regex_pkey = ''
|
||||
|
||||
# plural form of the url for action 'list' and no prefix
|
||||
if view_action == 'list':
|
||||
url_name = url_prefix + view.model.get_class_name_plural_dash_case()
|
||||
regex_url = view.model.get_class_name_plural_dash_case()
|
||||
|
||||
# for 'guichet' prefix the regex by the collectivite
|
||||
if view.model.get_class_name() == 'Guichet':
|
||||
regex_base += 'collectivite/(?P<collectivite>[\w,-]+)/'
|
||||
|
||||
# build the regex
|
||||
regex = regex_base + regex_url + regex_pkey + '$'
|
||||
|
||||
# add the url pattern to the management list
|
||||
management_urlpatterns += [url(regex, view.as_view(), name=url_name)]
|
||||
|
||||
# add the ForwardFile 'list' url patterns for Collectivite
|
||||
ff_list_regex_url = ForwardFileListView.model.get_class_name_plural_dash_case()
|
||||
management_urlpatterns += [
|
||||
url(
|
||||
r'^(?P<connecteur>[\w,-]+)/collectivite/(?P<collectivite>[\w,-]+)/' + ff_list_regex_url + '$',
|
||||
ForwardFileListView.as_view(),
|
||||
name='col-list-' + ff_list_regex_url
|
||||
)
|
||||
]
|
|
@ -0,0 +1,258 @@
|
|||
#!/usr/bin/env python
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# passerelle - uniform access to multiple data sources and services
|
||||
# Copyright (C) 2018 Entr'ouvert
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify it
|
||||
# under the terms of the GNU Affero General Public License as published
|
||||
# by the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import json
|
||||
import base64
|
||||
import datetime
|
||||
import re
|
||||
import hashlib
|
||||
import copy
|
||||
import mimetypes
|
||||
import sys
|
||||
|
||||
from HTMLParser import HTMLParser
|
||||
|
||||
from django.urls import reverse_lazy
|
||||
|
||||
|
||||
def to_dash_case(camel_str):
|
||||
s1 = re.sub('(.)([A-Z][a-z]+)', r'\1-\2', camel_str)
|
||||
return re.sub('([a-z0-9])([A-Z])', r'\1-\2', s1).lower()
|
||||
|
||||
|
||||
# from: https://stackoverflow.com/a/13848698
|
||||
def force_encoded_string_output(func, default_enc='utf-8'):
|
||||
"""Decorator function that return the result converted to str type."""
|
||||
if sys.version_info.major < 3:
|
||||
def _func(*args, **kwargs):
|
||||
return func(*args, **kwargs).encode(sys.stdout.encoding or default_enc)
|
||||
return _func
|
||||
else:
|
||||
return func
|
||||
|
||||
|
||||
class MLStripper(HTMLParser):
|
||||
"""HTML parser that removes html tags."""
|
||||
def __init__(self):
|
||||
self.reset()
|
||||
self.fed = []
|
||||
def handle_data(self, d):
|
||||
self.fed.append(d)
|
||||
def get_data(self):
|
||||
return ''.join(self.fed)
|
||||
|
||||
|
||||
def strip_tags(html):
|
||||
"""Remove html tags from a string."""
|
||||
s = MLStripper()
|
||||
s.feed(html)
|
||||
return s.get_data()
|
||||
|
||||
|
||||
def clean_spaces(text):
|
||||
"""Remove extra spaces an line breaks from a string."""
|
||||
text = text.replace('\n', ' ')
|
||||
text = text.replace('\r', ' ')
|
||||
text = text.replace('\t', ' ')
|
||||
text = text.replace('\\n', ' ')
|
||||
text = text.replace('\\r', ' ')
|
||||
text = text.replace('\\t', ' ')
|
||||
return re.sub(r' +', ' ', text).strip()
|
||||
|
||||
|
||||
def normalize(value):
|
||||
"""Normalize a value to be send to openADS.API."""
|
||||
if value is None:
|
||||
return ''
|
||||
if not isinstance(value, unicode):
|
||||
value = unicode(value)
|
||||
return clean_spaces(value)
|
||||
|
||||
|
||||
def get_file_data(path, b64=True):
|
||||
"""Return the content of a file as a string, in base64 if specified."""
|
||||
with open(path, 'r') as f:
|
||||
if b64:
|
||||
return base64.b64encode(f.read())
|
||||
return f.read()
|
||||
|
||||
|
||||
# copy-pasted from 'wcs/qommon/misc.py'
|
||||
def get_file_digest(content, chunk_size=100000):
|
||||
"""Return a hash for the content specified."""
|
||||
digest = hashlib.sha256()
|
||||
content.seek(0)
|
||||
def read_chunk():
|
||||
return content.read(chunk_size)
|
||||
for chunk in iter(read_chunk, ''):
|
||||
digest.update(chunk)
|
||||
return digest.hexdigest()
|
||||
|
||||
|
||||
def get_upload_path(instance, filename=None):
|
||||
"""Return a relative upload path for a file."""
|
||||
fn_ref = instance.orig_filename if instance.orig_filename else filename
|
||||
# file_hash and content_type attribute are updated on file save()
|
||||
# so if the file was not yet saved, it may have those attributes undefined
|
||||
# this is why we update them here, if they are empty
|
||||
instance.update_file_hash(only_if_empty=True)
|
||||
instance.update_content_type(only_if_empty=True)
|
||||
# be careful:
|
||||
# * openADS accept only filename less than 50 chars
|
||||
# * name should be unique, even if the content is the same
|
||||
return 'to_openADS__%s__%s%s' % (
|
||||
datetime.datetime.now().strftime('%Y-%m-%d_%Hh%Mm%Ss%f'),
|
||||
instance.file_hash[:4],
|
||||
get_file_extension(fn_ref, instance.content_type)[:5]
|
||||
)
|
||||
|
||||
|
||||
def get_file_extension(filename, mimetype=None):
|
||||
"""Return the extension of the file, according to its filename or specified mimetype."""
|
||||
file_extension = None
|
||||
if filename and '.' in filename:
|
||||
file_extension = re.sub(r'^.*\.', '.', filename)
|
||||
elif mimetype:
|
||||
file_extension = mimetypes.guess_extension(mimetype)
|
||||
return file_extension if file_extension else ''
|
||||
|
||||
|
||||
def trunc_str_values(value, limit, visited=None, truncate_text=u'…'):
|
||||
"""Truncate a string value (not dict keys) and append a truncate text."""
|
||||
|
||||
if visited is None:
|
||||
visited = []
|
||||
if not value in visited:
|
||||
if isinstance(value, basestring) and len(value) > limit:
|
||||
value = value[:limit] + truncate_text
|
||||
elif isinstance(value, dict) or isinstance(value, list) or isinstance(value, tuple):
|
||||
visited.append(value)
|
||||
iterator = value.iteritems() if isinstance(value, dict) else enumerate(value)
|
||||
for k,v in iterator:
|
||||
value[k] = trunc_str_values(v, limit, visited, truncate_text)
|
||||
return value
|
||||
|
||||
|
||||
class DictDumper(object):
|
||||
"""Helper to dump a dictionary to a string representation with lazy processing.
|
||||
|
||||
Only applied when dict is converted to string (lazy processing):
|
||||
- long strings truncated (after the dict has been 'deep' copied)
|
||||
- (optionaly) dict converted with json.dumps instead of unicode().
|
||||
"""
|
||||
|
||||
def __init__(self, dic, max_str_len=255, use_json_dumps=True):
|
||||
""" arguments:
|
||||
- dic string the dict to dump
|
||||
- max_str_len integer the maximul length of string values
|
||||
- use_json_dumps boolean True to use json.dumps() else it uses unicode()
|
||||
"""
|
||||
self.dic = dic
|
||||
self.max_str_len = max_str_len
|
||||
self.use_json_dumps = use_json_dumps
|
||||
|
||||
@force_encoded_string_output
|
||||
def __repr__(self):
|
||||
return u'DictDumper(dic=%r,max_str_len=%r,use_json_dumps=%r)' % (
|
||||
self.dic, self.max_str_len, self.use_json_dumps)
|
||||
|
||||
@force_encoded_string_output
|
||||
def __str__(self):
|
||||
return unicode(self)
|
||||
|
||||
def __unicode__(self):
|
||||
dict_trunc = trunc_str_values(copy.deepcopy(self.dic), self.max_str_len)
|
||||
dict_ref = json.dumps(dict_trunc) if self.use_json_dumps else dict_trunc
|
||||
return unicode(dict_ref)
|
||||
|
||||
|
||||
class BaseModel(object):
|
||||
"""A class that provide basic usefull functions.
|
||||
Intended for all models to extends it.
|
||||
"""
|
||||
|
||||
@classmethod
|
||||
def get_verbose_name(cls):
|
||||
"""Return the verbose name of the class (helper for META option)."""
|
||||
return cls._meta.verbose_name
|
||||
|
||||
@classmethod
|
||||
def get_verbose_name_plural(cls):
|
||||
"""Return the plural form of the verbose name of the class (helper for META option)."""
|
||||
return cls._meta.verbose_name_plural
|
||||
|
||||
@classmethod
|
||||
def get_class_name(cls):
|
||||
return cls.__name__
|
||||
|
||||
@classmethod
|
||||
def get_class_name_plural(cls):
|
||||
return cls.get_class_name() + 's'
|
||||
|
||||
@classmethod
|
||||
def get_class_name_dash_case(cls):
|
||||
return to_dash_case(cls.get_class_name())
|
||||
|
||||
@classmethod
|
||||
def get_class_name_plural_dash_case(cls):
|
||||
return to_dash_case(cls.get_class_name_plural())
|
||||
|
||||
@classmethod
|
||||
def get_class_name_title(cls):
|
||||
return cls.get_class_name_dash_case().replace('-', ' ').title()
|
||||
|
||||
@classmethod
|
||||
def get_class_name_plural_title(cls):
|
||||
return cls.get_class_name_plural_dash_case().replace('-', ' ').title()
|
||||
|
||||
@classmethod
|
||||
def get_fields(cls):
|
||||
"""Return the fields of the class (helper for META option)."""
|
||||
return cls._meta.get_fields(include_parents=True, include_hidden=False)
|
||||
|
||||
@force_encoded_string_output
|
||||
def __str__(self):
|
||||
return unicode(self)
|
||||
|
||||
# mainly for the view
|
||||
def get_fields_kv(self):
|
||||
"""Return the model's list of field's key value."""
|
||||
return [(field, getattr(self, field.name, None)) for field in self._meta.get_fields()]
|
||||
|
||||
def get_url_name(self, prefix='', plural=False):
|
||||
class_name_dash_case = self.__class__.get_class_name_dash_case()
|
||||
if plural:
|
||||
class_name_dash_case = self.__class__.get_class_name_plural_dash_case()
|
||||
return '%s%s' % (prefix + '-' if prefix else '', class_name_dash_case)
|
||||
|
||||
def get_url_params(self, primary_key=True):
|
||||
return {'pk': self.id} if primary_key else {}
|
||||
|
||||
def get_absolute_url(self):
|
||||
return reverse_lazy(self.get_url_name('view'), kwargs=self.get_url_params())
|
||||
|
||||
def get_edit_url(self):
|
||||
return reverse_lazy(self.get_url_name('edit'), kwargs=self.get_url_params())
|
||||
|
||||
def get_delete_url(self):
|
||||
return reverse_lazy(self.get_url_name('delete'), kwargs=self.get_url_params())
|
||||
|
||||
def get_list_url(self):
|
||||
return reverse_lazy(self.get_url_name('list', True), kwargs=self.get_url_params(False))
|
||||
|
|
@ -0,0 +1,319 @@
|
|||
#!/usr/bin/env python
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
from django.urls import reverse_lazy
|
||||
|
||||
from django.views.generic.detail import DetailView
|
||||
from django.views.generic.list import ListView
|
||||
from django.views.generic.edit import CreateView, UpdateView, DeleteView
|
||||
|
||||
from passerelle.views import GenericConnectorView
|
||||
|
||||
from .models import ForwardFile, Collectivite, Guichet, AtrealOpenads
|
||||
from .forms import ForwardFileForm, CollectiviteForm, GuichetForm
|
||||
|
||||
|
||||
def get_connecteur_from_request(view, key='connecteur'):
|
||||
"""Return the 'connecteur' object from the view object."""
|
||||
if not hasattr(view, 'connecteur') or not view.connecteur and view.request:
|
||||
connecteur_slug = view.request.resolver_match.kwargs.get(key, None)
|
||||
if connecteur_slug:
|
||||
view.connecteur = AtrealOpenads.objects.get(slug=connecteur_slug)
|
||||
return view.connecteur if hasattr(view, 'connecteur') else None
|
||||
|
||||
|
||||
def get_collectivite_from_request(view, key='collectivite'):
|
||||
"""Return the 'collectivite' object from the view object."""
|
||||
if not hasattr(view, 'collectivite') or not view.collectivite and view.request:
|
||||
collectivite_id = view.request.resolver_match.kwargs.get(key, None)
|
||||
if collectivite_id:
|
||||
view.collectivite = Collectivite.objects.get(id=collectivite_id)
|
||||
return view.collectivite if hasattr(view, 'collectivite') else None
|
||||
|
||||
|
||||
class ForwardFileView(DetailView):
|
||||
model = ForwardFile
|
||||
template_name = 'atreal_openads/manage/forwardfile_view.html'
|
||||
|
||||
def get_context_data(self, *args, **kwargs):
|
||||
context = super(ForwardFileView, self).get_context_data(*args, **kwargs)
|
||||
context['connecteur'] = get_connecteur_from_request(self)
|
||||
return context
|
||||
|
||||
|
||||
class ForwardFileListView(ListView):
|
||||
model = ForwardFile
|
||||
template_name = 'atreal_openads/manage/forwardfile_list.html'
|
||||
paginate_by = 50
|
||||
ordering = 'id'
|
||||
|
||||
def get_queryset(self):
|
||||
qset = None
|
||||
|
||||
collectivite = get_collectivite_from_request(self)
|
||||
if collectivite:
|
||||
qset = super(ForwardFileListView, self).get_queryset().filter(
|
||||
connecteur=get_connecteur_from_request(self),
|
||||
collectivite=collectivite)
|
||||
else:
|
||||
qset = super(ForwardFileListView, self).get_queryset().filter(
|
||||
connecteur=get_connecteur_from_request(self))
|
||||
|
||||
order_by = None
|
||||
order_by_param = self.request.GET.get('order-by', None)
|
||||
if order_by_param:
|
||||
fields_names = [f.name for f in self.model.get_fields()]
|
||||
order_by_field = order_by_param[1:] if order_by_param[0] == '-' else order_by_param
|
||||
if order_by_field in fields_names:
|
||||
order_by = order_by_param
|
||||
|
||||
return qset.order_by(order_by) if order_by else qset # qset.order_by()
|
||||
|
||||
def get_context_data(self, *args, **kwargs):
|
||||
context = super(ForwardFileListView, self).get_context_data(*args, **kwargs)
|
||||
context['connecteur'] = get_connecteur_from_request(self)
|
||||
context['collectivite'] = get_collectivite_from_request(self)
|
||||
return context
|
||||
|
||||
|
||||
class ForwardFileUpdateView(UpdateView):
|
||||
model = ForwardFile
|
||||
form_class = ForwardFileForm
|
||||
template_name = 'atreal_openads/manage/forwardfile_form.html'
|
||||
|
||||
def get_context_data(self, *args, **kwargs):
|
||||
context = super(ForwardFileUpdateView, self).get_context_data(*args, **kwargs)
|
||||
context['connecteur'] = get_connecteur_from_request(self)
|
||||
context['collectivite'] = get_collectivite_from_request(self)
|
||||
return context
|
||||
|
||||
def get_success_url(self, *args, **kwargs):
|
||||
back_to = self.request.GET.get('back-to')
|
||||
if back_to == 'list-forward-files':
|
||||
return reverse_lazy('list-forward-files', kwargs={
|
||||
'connecteur': get_connecteur_from_request(self).slug
|
||||
})
|
||||
elif back_to == 'col-list-forward-files':
|
||||
obj = self.get_object()
|
||||
if obj.collectivite:
|
||||
return reverse_lazy('col-list-forward-files', kwargs={
|
||||
'connecteur': get_connecteur_from_request(self).slug,
|
||||
'collectivite': obj.collectivite.id
|
||||
})
|
||||
return self.get_object().get_absolute_url()
|
||||
|
||||
|
||||
class ForwardFileDeleteView(DeleteView):
|
||||
model = ForwardFile
|
||||
form_class = ForwardFileForm
|
||||
template_name = 'atreal_openads/manage/forwardfile_form.html'
|
||||
|
||||
def get_context_data(self, *args, **kwargs):
|
||||
context = super(ForwardFileDeleteView, self).get_context_data(*args, **kwargs)
|
||||
context['connecteur'] = get_connecteur_from_request(self)
|
||||
context['collectivite'] = get_collectivite_from_request(self)
|
||||
return context
|
||||
|
||||
def get_success_url(self, *args, **kwargs):
|
||||
back_to = self.request.GET.get('back-to')
|
||||
if back_to == 'list-forward-files':
|
||||
return reverse_lazy('list-forward-files', kwargs={
|
||||
'connecteur': get_connecteur_from_request(self).slug
|
||||
})
|
||||
elif back_to == 'col-list-forward-files':
|
||||
obj = self.get_object()
|
||||
if obj.collectivite:
|
||||
return reverse_lazy('col-list-forward-files', kwargs={
|
||||
'connecteur': get_connecteur_from_request(self).slug,
|
||||
'collectivite': obj.collectivite.id
|
||||
})
|
||||
return reverse_lazy('view-connector', kwargs={
|
||||
'connector': 'atreal-openads',
|
||||
'slug' : get_connecteur_from_request(self).slug
|
||||
})
|
||||
|
||||
|
||||
class CollectiviteView(DetailView):
|
||||
model = Collectivite
|
||||
template_name = 'atreal_openads/manage/collectivite_view.html'
|
||||
|
||||
def get_context_data(self, *args, **kwargs):
|
||||
context = super(CollectiviteView, self).get_context_data(*args, **kwargs)
|
||||
context['connecteur'] = get_connecteur_from_request(self)
|
||||
context['guichet_add_url'] = reverse_lazy('create-guichet', kwargs={
|
||||
'connecteur' : context['connecteur'].slug,
|
||||
'collectivite': self.get_object().id})
|
||||
context['forward_files_list_url'] = reverse_lazy('col-list-forward-files', kwargs={
|
||||
'connecteur' : context['connecteur'].slug,
|
||||
'collectivite': self.get_object().id})
|
||||
return context
|
||||
|
||||
|
||||
class CollectiviteListView(ListView):
|
||||
model = Collectivite
|
||||
template_name = 'atreal_openads/manage/collectivite_list.html'
|
||||
paginate_by = 50
|
||||
ordering = 'id'
|
||||
|
||||
def get_queryset(self):
|
||||
qset = super(CollectiviteListView, self).get_queryset().filter(
|
||||
connecteur=get_connecteur_from_request(self))
|
||||
|
||||
order_by = None
|
||||
order_by_param = self.request.GET.get('order-by', None)
|
||||
if order_by_param:
|
||||
fields_names = [f.name for f in self.model.get_fields()]
|
||||
order_by_field = order_by_param[1:] if order_by_param[0] == '-' else order_by_param
|
||||
if order_by_field in fields_names:
|
||||
order_by = order_by_param
|
||||
|
||||
return qset.order_by(order_by) if order_by else qset # qset.order_by()
|
||||
|
||||
def get_context_data(self, *args, **kwargs):
|
||||
context = super(CollectiviteListView, self).get_context_data(*args, **kwargs)
|
||||
context['connecteur'] = get_connecteur_from_request(self)
|
||||
context['collectivite_add_url'] = reverse_lazy('create-collectivite', kwargs={
|
||||
'connecteur': context['connecteur'].slug})
|
||||
return context
|
||||
|
||||
|
||||
class CollectiviteCreateView(CreateView):
|
||||
model = Collectivite
|
||||
form_class = CollectiviteForm
|
||||
template_name = 'atreal_openads/manage/collectivite_form.html'
|
||||
|
||||
def get_context_data(self, *args, **kwargs):
|
||||
context = super(CollectiviteCreateView, self).get_context_data(*args, **kwargs)
|
||||
context['connecteur'] = get_connecteur_from_request(self)
|
||||
return context
|
||||
|
||||
def get_form_kwargs(self):
|
||||
kwargs = super(CollectiviteCreateView, self).get_form_kwargs()
|
||||
kwargs['connecteur'] = get_connecteur_from_request(self)
|
||||
return kwargs
|
||||
|
||||
def get_success_url(self, *args, **kwargs):
|
||||
if self.request.GET.get('back-to') == 'list-collectivites':
|
||||
return reverse_lazy('list-collectivites', kwargs={
|
||||
'connecteur' : get_connecteur_from_request(self).slug
|
||||
})
|
||||
return reverse_lazy('view-connector', kwargs={
|
||||
'connector': 'atreal-openads',
|
||||
'slug' : get_connecteur_from_request(self).slug
|
||||
})
|
||||
|
||||
|
||||
class CollectiviteUpdateView(UpdateView):
|
||||
model = Collectivite
|
||||
form_class = CollectiviteForm
|
||||
template_name = 'atreal_openads/manage/collectivite_form.html'
|
||||
|
||||
def get_context_data(self, *args, **kwargs):
|
||||
context = super(CollectiviteUpdateView, self).get_context_data(*args, **kwargs)
|
||||
context['connecteur'] = get_connecteur_from_request(self)
|
||||
return context
|
||||
|
||||
def get_success_url(self, *args, **kwargs):
|
||||
if self.request.GET.get('back-to') == 'list-collectivites':
|
||||
return reverse_lazy('list-collectivites', kwargs={
|
||||
'connecteur' : get_connecteur_from_request(self).slug
|
||||
})
|
||||
return self.get_object().get_absolute_url()
|
||||
|
||||
|
||||
class CollectiviteDeleteView(DeleteView):
|
||||
model = Collectivite
|
||||
form_class = CollectiviteForm
|
||||
template_name = 'atreal_openads/manage/collectivite_form.html'
|
||||
|
||||
def get_context_data(self, *args, **kwargs):
|
||||
context = super(CollectiviteDeleteView, self).get_context_data(*args, **kwargs)
|
||||
context['connecteur'] = get_connecteur_from_request(self)
|
||||
return context
|
||||
|
||||
def get_success_url(self, *args, **kwargs):
|
||||
if self.request.GET.get('back-to') == 'list-collectivites':
|
||||
return reverse_lazy('list-collectivites', kwargs={
|
||||
'connecteur' : get_connecteur_from_request(self).slug
|
||||
})
|
||||
return reverse_lazy('view-connector', kwargs={
|
||||
'connector': 'atreal-openads',
|
||||
'slug' : get_connecteur_from_request(self).slug
|
||||
})
|
||||
|
||||
|
||||
class GuichetView(DetailView):
|
||||
model = Guichet
|
||||
template_name = 'atreal_openads/manage/guichet_view.html'
|
||||
|
||||
def get_context_data(self, *args, **kwargs):
|
||||
context = super(GuichetView, self).get_context_data(*args, **kwargs)
|
||||
context['connecteur'] = get_connecteur_from_request(self)
|
||||
context['collectivite'] = get_collectivite_from_request(self)
|
||||
return context
|
||||
|
||||
|
||||
class GuichetCreateView(CreateView):
|
||||
model = Guichet
|
||||
form_class = GuichetForm
|
||||
template_name = 'atreal_openads/manage/guichet_form.html'
|
||||
|
||||
def get_form_kwargs(self):
|
||||
kwargs = super(GuichetCreateView, self).get_form_kwargs()
|
||||
kwargs['collectivite'] = get_collectivite_from_request(self)
|
||||
return kwargs
|
||||
|
||||
def get_context_data(self, *args, **kwargs):
|
||||
context = super(GuichetCreateView, self).get_context_data(*args, **kwargs)
|
||||
context['connecteur'] = get_connecteur_from_request(self)
|
||||
context['collectivite'] = get_collectivite_from_request(self)
|
||||
return context
|
||||
|
||||
def get_success_url(self):
|
||||
return reverse_lazy('view-collectivite', kwargs={
|
||||
'connecteur': get_connecteur_from_request(self).slug,
|
||||
'pk' : get_collectivite_from_request(self).id
|
||||
})
|
||||
|
||||
|
||||
class GuichetUpdateView(UpdateView):
|
||||
model = Guichet
|
||||
form_class = GuichetForm
|
||||
template_name = 'atreal_openads/manage/guichet_form.html'
|
||||
|
||||
def get_context_data(self, *args, **kwargs):
|
||||
context = super(GuichetUpdateView, self).get_context_data(*args, **kwargs)
|
||||
context['connecteur'] = get_connecteur_from_request(self)
|
||||
context['collectivite'] = get_collectivite_from_request(self)
|
||||
return context
|
||||
|
||||
|
||||
class GuichetDeleteView(DeleteView):
|
||||
model = Guichet
|
||||
form_class = GuichetForm
|
||||
template_name = 'atreal_openads/manage/guichet_form.html'
|
||||
|
||||
def get_context_data(self, *args, **kwargs):
|
||||
context = super(GuichetDeleteView, self).get_context_data(*args, **kwargs)
|
||||
context['connecteur'] = get_connecteur_from_request(self)
|
||||
context['collectivite'] = get_collectivite_from_request(self)
|
||||
return context
|
||||
|
||||
def get_success_url(self, *args, **kwargs):
|
||||
return reverse_lazy('view-collectivite', kwargs={
|
||||
'connecteur': get_connecteur_from_request(self).slug,
|
||||
'pk' : get_collectivite_from_request(self).id
|
||||
})
|
||||
|
||||
|
||||
class AtrealOpenadsView(GenericConnectorView):
|
||||
model = AtrealOpenads
|
||||
template_name = 'atreal_openads/manage/connector_view.html'
|
||||
|
||||
def get_context_data(self, *args, **kwargs):
|
||||
context = super(AtrealOpenadsView, self).get_context_data(*args, **kwargs)
|
||||
context['collectivite_fields'] = Collectivite.get_fields()
|
||||
context['collectivite_add_url'] = reverse_lazy('create-collectivite', kwargs={
|
||||
'connecteur': self.get_object().slug})
|
||||
return context
|
||||
|
|
@ -12,32 +12,39 @@ import json
|
|||
import os
|
||||
import base64
|
||||
import re
|
||||
import datetime
|
||||
import magic
|
||||
|
||||
from requests import Response
|
||||
|
||||
from django.http import Http404
|
||||
from django.http.request import HttpRequest, QueryDict
|
||||
from django.http.response import JsonResponse
|
||||
from django.core.files import File
|
||||
from django.core.files.base import ContentFile
|
||||
#from django.db.models.query import QuerySet
|
||||
from django.core.exceptions import ValidationError
|
||||
|
||||
from passerelle.utils.jsonresponse import APIError
|
||||
from passerelle.base.models import Job
|
||||
|
||||
from atreal_openads.models import (
|
||||
strip_tags,
|
||||
clean_spaces,
|
||||
normalize,
|
||||
from atreal_openads.utils import (
|
||||
get_file_data,
|
||||
get_upload_path,
|
||||
trunc_str_values,
|
||||
DictDumper,
|
||||
AtrealOpenads,
|
||||
ForwardFile
|
||||
get_file_digest,
|
||||
trunc_str_values
|
||||
)
|
||||
|
||||
from atreal_openads.models import (
|
||||
ForwardFile,
|
||||
Guichet,
|
||||
Collectivite,
|
||||
AtrealOpenads
|
||||
)
|
||||
|
||||
|
||||
CONNECTOR_NAME = 'atreal-openads'
|
||||
CONNECTOR_SLUG = 'atreal'
|
||||
COLLECTIVITE = 3
|
||||
COLLECTIVITE = 79
|
||||
OPENADS_API_LOGIN = 'publik-passerelle'
|
||||
OPENADS_API_PASSWORD = base64.urlsafe_b64encode(os.urandom(20))
|
||||
OPENADS_API_URL = 'http://openads.api/'
|
||||
|
@ -55,114 +62,221 @@ TEST_FILE_PLAN_CADASTRAL = os.path.join(RESOURCES_DIR, 'plancadastral.pdf')
|
|||
def atreal_openads(db):
|
||||
return AtrealOpenads.objects.create(
|
||||
slug = CONNECTOR_SLUG,
|
||||
collectivite = COLLECTIVITE,
|
||||
default_collectivite_openADS_id = COLLECTIVITE,
|
||||
openADS_API_url = OPENADS_API_URL,
|
||||
basic_auth_username = OPENADS_API_LOGIN,
|
||||
basic_auth_password = OPENADS_API_PASSWORD
|
||||
)
|
||||
|
||||
@pytest.fixture
|
||||
def collectivite_1(db, atreal_openads):
|
||||
return Collectivite.objects.create(
|
||||
name = u'Macollectivité',
|
||||
connecteur = atreal_openads,
|
||||
openADS_id = '3'
|
||||
)
|
||||
|
||||
def test_strip_tags():
|
||||
s = 'aaa b cc '
|
||||
assert strip_tags(s) == s
|
||||
|
||||
ss = s + '<em>dd'
|
||||
assert strip_tags(ss) == s + 'dd'
|
||||
|
||||
ss = s + '<em>dd</em>'
|
||||
assert strip_tags(ss) == s + 'dd'
|
||||
|
||||
ss = s + '<em>dd</em>'
|
||||
assert strip_tags(ss) == s + 'dd'
|
||||
|
||||
ss = s + ' 1 < 3'
|
||||
assert strip_tags(ss) == s + ' 1 < 3'
|
||||
@pytest.fixture
|
||||
def collectivite_1_guichet(db, atreal_openads, collectivite_1):
|
||||
return Guichet.objects.create(
|
||||
collectivite = collectivite_1,
|
||||
ouverture_jour_h = datetime.time(9, 0),
|
||||
fermeture_jour_h = datetime.time(17, 0),
|
||||
ouverture_sem_d = 1, # Lundi
|
||||
fermeture_sem_d = 5, # Vendredi
|
||||
ouverture_sem_h = datetime.time(8, 30),
|
||||
fermeture_sem_h = datetime.time(12, 15)
|
||||
)
|
||||
|
||||
|
||||
def test_clean_spaces():
|
||||
s = 'aaa b cc '
|
||||
assert clean_spaces(s) == 'aaa b cc'
|
||||
|
||||
s = 'a\ta b\nb c\rc d\\n\\r\\td'
|
||||
assert clean_spaces(s) == 'a a b b c c d d'
|
||||
def upload2ForwardFile(connecteur, path, numero_dossier, type_fichier):
|
||||
"""Convert a file path to a ForwardFile."""
|
||||
if path:
|
||||
rand_id = base64.urlsafe_b64encode(os.urandom(6))
|
||||
fwd_file = ForwardFile()
|
||||
fwd_file.connecteur = connecteur
|
||||
fwd_file.numero_demande = rand_id
|
||||
fwd_file.numero_dossier = numero_dossier
|
||||
fwd_file.type_fichier = type_fichier
|
||||
fwd_file.orig_filename = os.path.basename(path)
|
||||
fwd_file.content_type = magic.from_file(path, mime=True)
|
||||
with open(path, 'r') as fp:
|
||||
fwd_file.file_hash = get_file_digest(fp)
|
||||
fwd_file.upload_file = File(open(path, 'r'))
|
||||
fwd_file.upload_status = 'pending'
|
||||
return fwd_file
|
||||
return None
|
||||
|
||||
|
||||
def test_normalize():
|
||||
assert normalize(None) == ''
|
||||
def test_forward_file(atreal_openads):
|
||||
ff = ForwardFile(
|
||||
numero_demande='45641531',
|
||||
numero_dossier=FAKE_NUMERO_DOSSIER,
|
||||
type_fichier='CERFA',
|
||||
orig_filename='afile',
|
||||
file_hash='ffdf456fdsvgb4bgfb6g4f5b',
|
||||
upload_status='pending',
|
||||
connecteur=atreal_openads,
|
||||
collectivite=None
|
||||
)
|
||||
ff.upload_file.save(ff.orig_filename, ContentFile(get_file_data(TEST_FILE_CERFA_DIA)))
|
||||
ff.save()
|
||||
|
||||
s = 'aaa b cc '
|
||||
assert normalize(s) == 'aaa b cc'
|
||||
assert repr(ff) == (
|
||||
u'ForwardFile(id=%s,connecteur=%s,collectivite=%s'
|
||||
',demande=%s,dossier=%s,type=%s,filename=%s,status=%s)' % (
|
||||
ff.id, unicode(ff.connecteur), None,
|
||||
ff.numero_demande, ff.numero_dossier,
|
||||
ff.type_fichier, ff.orig_filename, ff.upload_status
|
||||
)
|
||||
).encode('utf-8')
|
||||
|
||||
s = 'a\ta b\nb c\rc d\\n\\r\\td'
|
||||
assert normalize(s) == 'a a b b c c d d'
|
||||
assert str(ff) == '%s[%s]' % (trunc_str_values(ff.orig_filename, 20), 'Pending')
|
||||
assert unicode(ff) == u'%s[%s]' % (trunc_str_values(ff.orig_filename, 20), 'Pending')
|
||||
|
||||
assert ff.get_status() == 'Pending'
|
||||
assert ff.get_status('invalid') == 'invalid'
|
||||
|
||||
params = ff.get_url_params()
|
||||
assert params['connecteur'] == atreal_openads.slug
|
||||
|
||||
assert ff.upload_file is not None
|
||||
assert ff.upload_file.size > 0
|
||||
assert ff.size == ff.upload_file.size
|
||||
assert ff.file_hash == '811588016518eedeb4507f3e4c41be958a03576b0cd20bdb2cb9c6a186dbd887'
|
||||
|
||||
ff.content_type = 'application/pdf'
|
||||
ff.upload_status = 'success'
|
||||
ff.save()
|
||||
assert ff.upload_status == 'success'
|
||||
assert ff.get_status() == 'Success'
|
||||
assert ff.content_type == 'application/pdf'
|
||||
|
||||
with pytest.raises(ValueError) as e:
|
||||
ff.upload_file.size
|
||||
assert unicode(e.value) == "The 'upload_file' attribute has no file associated with it."
|
||||
assert ff.size > 0
|
||||
assert ff.file_hash == '811588016518eedeb4507f3e4c41be958a03576b0cd20bdb2cb9c6a186dbd887'
|
||||
|
||||
ff.file_hash = ''
|
||||
ff.update_file_hash()
|
||||
ff.update_content_type()
|
||||
ff.save()
|
||||
assert ff.file_hash == ''
|
||||
assert ff.content_type == ''
|
||||
|
||||
ff.orig_filename = ''
|
||||
with pytest.raises(ValidationError) as e:
|
||||
ff.save()
|
||||
assert len(e.value.messages) == 1
|
||||
assert '__all__' in e.value.message_dict
|
||||
assert unicode(e.value.message_dict['__all__'][0]) == u"A %s cannot have all the following fields empty: %s." % (
|
||||
ff.get_verbose_name(),
|
||||
['file_hash', 'orig_filename', 'upload_file'])
|
||||
|
||||
ff.delete()
|
||||
|
||||
|
||||
def test_get_file_data():
|
||||
assert get_file_data(TEST_FILE_CERFA_DIA) == base64.b64encode(open(TEST_FILE_CERFA_DIA).read())
|
||||
assert get_file_data(TEST_FILE_CERFA_DIA, b64=False) == open(TEST_FILE_CERFA_DIA).read()
|
||||
def test_collectivite(collectivite_1, collectivite_1_guichet):
|
||||
col = collectivite_1
|
||||
|
||||
assert repr(col) == (
|
||||
u'Collectivite(id=%s,name=%s,connecteur=%s,openADS_id=%s,guichet=%s)' % (
|
||||
1, unicode(col.name), unicode(col.connecteur), col.openADS_id,
|
||||
unicode(col.guichet) if hasattr(col, 'guichet') else None
|
||||
)
|
||||
).encode('utf-8')
|
||||
|
||||
assert str(col) == col.name.encode('utf-8')
|
||||
|
||||
assert unicode(col) == col.name
|
||||
|
||||
class_fields = Collectivite.get_fields()
|
||||
assert len(class_fields) == 6
|
||||
assert class_fields[0].name == 'id'
|
||||
assert class_fields[1].name == 'name'
|
||||
assert class_fields[2].name == 'connecteur'
|
||||
assert class_fields[3].name == 'openADS_id'
|
||||
assert class_fields[4].name == 'guichet'
|
||||
assert class_fields[5].name == 'forward_file'
|
||||
|
||||
instance_fields = col.get_fields_kv()
|
||||
assert len(instance_fields) == 6
|
||||
assert instance_fields[0][0].name == 'id'
|
||||
assert instance_fields[1][0].name == 'name'
|
||||
assert instance_fields[2][0].name == 'connecteur'
|
||||
assert instance_fields[3][0].name == 'openADS_id'
|
||||
assert instance_fields[4][0].name == 'guichet'
|
||||
assert instance_fields[5][0].name == 'forward_file'
|
||||
assert instance_fields[0][1] == col.id
|
||||
assert instance_fields[1][1] == col.name
|
||||
assert instance_fields[2][1] is col.connecteur
|
||||
assert instance_fields[3][1] == col.openADS_id
|
||||
assert instance_fields[4][1] is col.guichet
|
||||
assert instance_fields[5][1] is None # shouldn't it be QuerySet?
|
||||
|
||||
params = col.get_url_params()
|
||||
assert params['connecteur'] == col.connecteur.slug
|
||||
|
||||
|
||||
def test_get_upload_path(app, atreal_openads):
|
||||
FF = atreal_openads.upload2ForwardFile(TEST_FILE_CERFA_DIA, FAKE_NUMERO_DOSSIER, 'cerfa')
|
||||
assert re.search(
|
||||
r"^pass_openADS_up_%s_%s$" %
|
||||
('[0-9]{4}-[A-Z][a-z]{2}-[0-9]{2}_[0-9]{2}h[0-9]{2}m[0-9]{2}s[0-9]+', 'cc90'),
|
||||
get_upload_path(FF))
|
||||
def test_guichet(collectivite_1_guichet):
|
||||
g = collectivite_1_guichet
|
||||
|
||||
assert repr(g) == (
|
||||
u'Guichet(id=%s,collectivite=%s,%s)' % (
|
||||
1, unicode(g.collectivite), unicode(g)
|
||||
)
|
||||
).encode('utf-8')
|
||||
|
||||
assert str(g) == u'Monday 08:30 -> Friday 12:15 [09:00/17:00]'.encode('utf-8')
|
||||
|
||||
assert unicode(g) == u'Monday 08:30 -> Friday 12:15 [09:00/17:00]'
|
||||
|
||||
params = g.get_url_params()
|
||||
assert params['collectivite'] == g.collectivite.id
|
||||
|
||||
with pytest.raises(Exception) as e:
|
||||
g.get_list_url()
|
||||
assert unicode(e.value) == u"Guichet:get_list_url() method should not be called"
|
||||
|
||||
|
||||
def test_trunc_str_values():
|
||||
d = {}
|
||||
assert trunc_str_values(d, 10) == d
|
||||
d = {'a': '123456789'}
|
||||
assert trunc_str_values(d, 0) == {'a': u'…'}
|
||||
d = {'a': '123456789'}
|
||||
assert trunc_str_values(d, 1) == {'a': u'1…'}
|
||||
d = {'a': '123456789'}
|
||||
assert trunc_str_values(d, 2) == {'a': u'12…'}
|
||||
d = {'a': '123456789'}
|
||||
assert trunc_str_values(d, 5) == {'a': u'12345…'}
|
||||
d = {'a': '123456789'}
|
||||
assert trunc_str_values(d, 8) == {'a': u'12345678…'}
|
||||
d = {'a': '123456789'}
|
||||
assert trunc_str_values(d, 9) == {'a': u'123456789'}
|
||||
d = {'a': '123456789'}
|
||||
assert trunc_str_values(d, 10) == d
|
||||
def test_guichet_is_open(collectivite_1_guichet):
|
||||
g = collectivite_1_guichet
|
||||
|
||||
d = {'a': '123456789', 'b123456789': '987654321'}
|
||||
assert trunc_str_values(d, 5) == {'a': u'12345…', 'b123456789': u'98765…'}
|
||||
dt_fmt = '%Y-%m-%d %H:%M'
|
||||
|
||||
d = {'a': '123456789', 'b123456789': '987654321', 'c': {'c1':'ABCDEFGHIJK'}}
|
||||
assert trunc_str_values(d, 5) == {'a': u'12345…', 'b123456789': u'98765…', 'c': {'c1': u'ABCDE…'}}
|
||||
d_monday = '2019-07-29'
|
||||
d_sunday = '2019-07-28'
|
||||
d_saturday = '2019-07-27'
|
||||
d_friday = '2019-07-26'
|
||||
d_thursday = '2019-07-25'
|
||||
d_wednesday = '2019-07-24'
|
||||
d_tuesday = '2019-07-22'
|
||||
t_open = '10:44'
|
||||
t_closed_before = '6:33'
|
||||
t_closed_after = '20:08'
|
||||
|
||||
d = {'a': '123456789', 'b123456789': '987654321', 'c': {'c1':'ABCDEFGHIJK'}, 'd': ['123456789']}
|
||||
assert trunc_str_values(d, 5) == {'a': u'12345…', 'b123456789': u'98765…', 'c': {'c1': u'ABCDE…'}, 'd': [u'12345…']}
|
||||
for d in [d_monday, d_tuesday, d_wednesday, d_thursday, d_friday]:
|
||||
for t in [(t_open, True), (t_closed_before, False), (t_closed_after, False)]:
|
||||
dt = datetime.datetime.strptime(d + ' ' + t[0], dt_fmt)
|
||||
assert g.is_open(dt) == t[1]
|
||||
|
||||
d = {'a': '123456789', 'b123456789': '987654321', 'c': {'c1':'ABCDEFGHIJK'}, 'd': ['123456789', {'eeeeeeeeee':'132456789'}]}
|
||||
assert trunc_str_values(d, 5) == {'a': u'12345…', 'b123456789': u'98765…', 'c': {'c1': u'ABCDE…'}, 'd': [u'12345…', {'eeeeeeeeee': u'13245…'}]}
|
||||
dt = datetime.datetime.strptime(d_friday + ' 16:12', dt_fmt)
|
||||
assert g.is_open(dt) == False
|
||||
|
||||
for d in [d_saturday, d_sunday]:
|
||||
for t in [t_open, t_closed_before, t_closed_after]:
|
||||
dt = datetime.datetime.strptime(d + ' ' + t, dt_fmt)
|
||||
assert g.is_open(dt) == False
|
||||
|
||||
with pytest.raises(TypeError) as e:
|
||||
g.is_open('invalid datetime')
|
||||
assert unicode(e.value) == u"is_open() expect a datetime object (not a %s)" % type('')
|
||||
|
||||
assert g.is_open(None) == False
|
||||
|
||||
|
||||
def test_dict_dumper():
|
||||
d = {}
|
||||
dd = DictDumper(d, use_json_dumps=False)
|
||||
assert d == dd.dic
|
||||
assert unicode(d) == unicode(dd)
|
||||
dd = DictDumper(d, 0, use_json_dumps=False)
|
||||
assert d == dd.dic
|
||||
assert unicode(d) == unicode(dd)
|
||||
|
||||
d = {'a': '123456789'}
|
||||
dd = DictDumper(d, 10, use_json_dumps=False)
|
||||
assert d == dd.dic
|
||||
assert unicode(d) == unicode(dd)
|
||||
dd = DictDumper(d, 5, use_json_dumps=False)
|
||||
assert d == dd.dic
|
||||
assert unicode(dd) == unicode({'a': u'12345…'})
|
||||
dd = DictDumper(d, 5, use_json_dumps=True)
|
||||
assert d == dd.dic
|
||||
assert unicode(dd) == u'{"a": "12345\\u2026"}'
|
||||
|
||||
|
||||
def test_openads_log_json_payload(app, atreal_openads):
|
||||
def test_openads_log_json_payload(atreal_openads):
|
||||
# TODO implement
|
||||
assert True
|
||||
# change the debug file path
|
||||
|
@ -170,7 +284,7 @@ def test_openads_log_json_payload(app, atreal_openads):
|
|||
# check that what was is logged is correct
|
||||
|
||||
|
||||
def test_openads_get_files_from_json_payload(app, atreal_openads):
|
||||
def test_openads_get_files_from_json_payload(atreal_openads):
|
||||
title = 'payload'
|
||||
|
||||
assert atreal_openads.get_files_from_json_payload({'files':[{'a':'file'}]}) == [{'a':'file'}]
|
||||
|
@ -194,7 +308,7 @@ def test_openads_get_files_from_json_payload(app, atreal_openads):
|
|||
assert unicode(e.value) == u"Expecting non-empty '%s' value in JSON %s" % ('files', title)
|
||||
|
||||
|
||||
def test_check_file_dict(app, atreal_openads):
|
||||
def test_check_file_dict(atreal_openads):
|
||||
title = 'payload'
|
||||
|
||||
d = {
|
||||
|
@ -235,7 +349,7 @@ def test_check_file_dict(app, atreal_openads):
|
|||
assert unicode(e.value) == u"Expecting 'file.%s' key in JSON %s" % ('b64_content', title)
|
||||
|
||||
|
||||
def test_get_first_file_from_json_payload(app, atreal_openads):
|
||||
def test_get_first_file_from_json_payload(atreal_openads):
|
||||
title = 'payload'
|
||||
|
||||
d = {
|
||||
|
@ -250,7 +364,7 @@ def test_get_first_file_from_json_payload(app, atreal_openads):
|
|||
d, title, ensure_content=True, b64=False) == d['files'][0]
|
||||
|
||||
|
||||
def test_openads_check_status(app, atreal_openads):
|
||||
def test_openads_check_status(atreal_openads):
|
||||
fake_resp_json = {
|
||||
'message': 'Service online'
|
||||
}
|
||||
|
@ -261,7 +375,7 @@ def test_openads_check_status(app, atreal_openads):
|
|||
assert jresp['response'] == 200
|
||||
|
||||
|
||||
def test_openads_create_dossier(app, atreal_openads):
|
||||
def test_openads_create_dossier(atreal_openads, collectivite_1, collectivite_1_guichet):
|
||||
fake_req_json = {
|
||||
"fields": {
|
||||
|
||||
|
@ -272,6 +386,7 @@ def test_openads_create_dossier(app, atreal_openads):
|
|||
# mandataire
|
||||
"mandataire_prenom" : "John",
|
||||
"mandataire_nom" : "Man",
|
||||
"mandataire_email" : "mandataire_email@domain.example",
|
||||
|
||||
"mandataire_qualite" : "Une personne morale",
|
||||
"mandataire_qualite_raw" : "Une personne morale",
|
||||
|
@ -288,6 +403,7 @@ def test_openads_create_dossier(app, atreal_openads):
|
|||
# petitionnaire
|
||||
"prenom": "Toto",
|
||||
"nom" : "Loulou",
|
||||
"email" : "petitionnaire_email@domain.example",
|
||||
|
||||
"qualite" : "Un particulier",
|
||||
"qualite_raw": "Un particulier",
|
||||
|
@ -325,12 +441,12 @@ def test_openads_create_dossier(app, atreal_openads):
|
|||
"plan_cadastral_2": {
|
||||
"content" : get_file_data(TEST_FILE_PLAN_CADASTRAL),
|
||||
"content_type": "application/pdf",
|
||||
"filename" : os.path.basename(TEST_FILE_PLAN_CADASTRAL)
|
||||
#"filename" : 'plan_cad'
|
||||
},
|
||||
"pouvoir_mandat": {
|
||||
"content" : get_file_data(TEST_FILE_CERFA_DIA),
|
||||
"content_type": "application/pdf",
|
||||
"filename" : 'mandat.pdf'
|
||||
"filename" : 'mandat'
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -354,15 +470,50 @@ def test_openads_create_dossier(app, atreal_openads):
|
|||
with pytest.raises(ValueError) as e:
|
||||
with mock.patch('passerelle.utils.Request.post') as requests_post:
|
||||
requests_post.return_value = fake_resp_bad
|
||||
atreal_openads.create_dossier(req, 'DIA',collectivite='not an integer')
|
||||
atreal_openads.create_dossier(req, 'DIA', collectivite='not an integer')
|
||||
assert unicode(e.value) == "invalid literal for int() with base 10: 'not an integer'"
|
||||
|
||||
with pytest.raises(APIError) as e:
|
||||
with mock.patch('passerelle.utils.Request.post') as requests_post:
|
||||
requests_post.return_value = fake_resp_bad
|
||||
atreal_openads.create_dossier(req, 'DIA')
|
||||
atreal_openads.create_dossier(req, 'DIA', collectivite_1.openADS_id)
|
||||
assert unicode(e.value) == "HTTP error: 502"
|
||||
|
||||
# TODO update the code and return message when it will be
|
||||
# correctly implemented in the openADS.API side.
|
||||
fake_resp_404 = Response()
|
||||
fake_resp_404.status_code = 404
|
||||
fake_resp_404.reason = 'Page not found'
|
||||
|
||||
with pytest.raises(APIError) as e:
|
||||
with mock.patch('passerelle.utils.Request.post') as requests_post:
|
||||
requests_post.return_value = fake_resp_404
|
||||
atreal_openads.create_dossier(req, 'DIA', collectivite=999)
|
||||
assert unicode(e.value) == "HTTP error: 404"
|
||||
|
||||
# guichet is open from Monday/8:30 to Friday/12:15, between 9:00 and 17:00
|
||||
now = datetime.datetime(2019, 8, 10, 16, 0, 0)
|
||||
jresp = atreal_openads.create_dossier(req, 'DIA', collectivite_1.openADS_id, now=now)
|
||||
assert jresp is not None
|
||||
assert len(jresp) == 1
|
||||
assert 'message' in jresp
|
||||
assert jresp['message'] == u"Guichet closed for collectivite '%s'" % collectivite_1
|
||||
|
||||
now = '2019-08-10 16:00:00'
|
||||
jresp = atreal_openads.create_dossier(req, 'DIA', collectivite_1.openADS_id, now=now)
|
||||
assert jresp is not None
|
||||
assert len(jresp) == 1
|
||||
assert 'message' in jresp
|
||||
assert jresp['message'] == u"Guichet closed for collectivite '%s'" % collectivite_1
|
||||
|
||||
now = {'invalid': 'type'}
|
||||
with pytest.raises(APIError) as e:
|
||||
jresp = atreal_openads.create_dossier(req, 'DIA', collectivite_1.openADS_id, now=now)
|
||||
assert unicode(e.value) == u"Invalid value of type '%s' for now argument of endpoint '%s' (must be: %s)" % (
|
||||
type(now),
|
||||
'create_dossier',
|
||||
"datetime or string formatted to '%s'" % '%Y-%m-%d %H:%M:%S')
|
||||
|
||||
fake_resp_json = {
|
||||
'numero_dossier' : FAKE_NUMERO_DOSSIER,
|
||||
'files': [{
|
||||
|
@ -380,7 +531,7 @@ def test_openads_create_dossier(app, atreal_openads):
|
|||
|
||||
with mock.patch('passerelle.utils.Request.post') as requests_post:
|
||||
requests_post.return_value = fake_resp
|
||||
jresp = atreal_openads.create_dossier(req, 'DIA')
|
||||
jresp = atreal_openads.create_dossier(req, 'DIA', collectivite_1.openADS_id)
|
||||
assert jresp['numero_dossier'] == fake_resp_json['numero_dossier']
|
||||
assert jresp['recepisse']['b64_content'] == fake_resp_json['files'][0]['b64_content']
|
||||
assert jresp['recepisse']['content_type'] == 'application/pdf'
|
||||
|
@ -391,7 +542,7 @@ def test_openads_create_dossier(app, atreal_openads):
|
|||
with pytest.raises(APIError) as e:
|
||||
with mock.patch('passerelle.utils.Request.post') as requests_post:
|
||||
requests_post.return_value = fake_resp
|
||||
atreal_openads.create_dossier(req, 'DIA')
|
||||
atreal_openads.create_dossier(req, 'DIA', collectivite_1.openADS_id)
|
||||
assert unicode(e.value) == u"Expecting '%s' value in JSON response to be a %s (not a %s)" % (
|
||||
'numero_dossier', 'string', type({}))
|
||||
|
||||
|
@ -400,7 +551,7 @@ def test_openads_create_dossier(app, atreal_openads):
|
|||
with pytest.raises(APIError) as e:
|
||||
with mock.patch('passerelle.utils.Request.post') as requests_post:
|
||||
requests_post.return_value = fake_resp
|
||||
atreal_openads.create_dossier(req, 'DIA')
|
||||
atreal_openads.create_dossier(req, 'DIA', collectivite_1.openADS_id)
|
||||
assert unicode(e.value) == u"Expecting 'numero_dossier' key in JSON response"
|
||||
|
||||
fake_resp_json['files'][0]['b64_content'] = 'invalid_;{[content}'
|
||||
|
@ -408,14 +559,14 @@ def test_openads_create_dossier(app, atreal_openads):
|
|||
with pytest.raises(APIError) as e:
|
||||
with mock.patch('passerelle.utils.Request.post') as requests_post:
|
||||
requests_post.return_value = fake_resp
|
||||
atreal_openads.create_dossier(req, 'DIA')
|
||||
atreal_openads.create_dossier(req, 'DIA', collectivite_1.openADS_id)
|
||||
assert unicode(e.value) == u'Failed to decode recepisse content from base 64'
|
||||
|
||||
fake_resp._content = 'df[{gfd;g#vfd'
|
||||
with pytest.raises(APIError) as e:
|
||||
with mock.patch('passerelle.utils.Request.post') as requests_post:
|
||||
requests_post.return_value = fake_resp
|
||||
atreal_openads.create_dossier(req, 'DIA')
|
||||
atreal_openads.create_dossier(req, 'DIA', collectivite_1.openADS_id)
|
||||
assert unicode(e.value) == u'No JSON content returned: %r' % fake_resp._content
|
||||
|
||||
job = Job.objects.filter(natural_id=FAKE_NUMERO_DOSSIER).last()
|
||||
|
@ -425,7 +576,7 @@ def test_openads_create_dossier(app, atreal_openads):
|
|||
assert job.method_name == 'upload_user_files'
|
||||
assert job.natural_id == FAKE_NUMERO_DOSSIER
|
||||
assert job.parameters is not None
|
||||
assert len(job.parameters) == 3
|
||||
assert len(job.parameters) == 4
|
||||
assert 'file_ids' in job.parameters
|
||||
assert len(job.parameters['file_ids']) == 4
|
||||
file_ids = job.parameters['file_ids']
|
||||
|
@ -456,7 +607,7 @@ def test_openads_create_dossier(app, atreal_openads):
|
|||
assert FF.upload_status == 'success'
|
||||
|
||||
|
||||
def test_openads_get_dossier(app, atreal_openads):
|
||||
def test_openads_get_dossier(atreal_openads):
|
||||
fake_resp_bad = Response()
|
||||
fake_resp_bad.status_code = 502
|
||||
fake_resp_bad.reason = 'Bad gateway'
|
||||
|
@ -513,34 +664,7 @@ def test_openads_get_dossier(app, atreal_openads):
|
|||
assert unicode(e.value) == u"HTTP error: 404, [path] (Invalid Type) \"invalid_type\" is not one of DIA, PC, DP, AT, PD"
|
||||
|
||||
|
||||
def test_openads_upload2ForwardFile(app, atreal_openads):
|
||||
FF = atreal_openads.upload2ForwardFile(None, None, None)
|
||||
assert FF is None
|
||||
|
||||
FF = atreal_openads.upload2ForwardFile(TEST_FILE_CERFA_DIA, FAKE_NUMERO_DOSSIER, 'cerfa')
|
||||
assert isinstance(FF, ForwardFile)
|
||||
assert len(FF.numero_demande) > 0
|
||||
assert FF.numero_dossier == FAKE_NUMERO_DOSSIER
|
||||
assert FF.type_fichier == 'cerfa'
|
||||
assert FF.orig_filename == os.path.basename(TEST_FILE_CERFA_DIA)
|
||||
assert FF.content_type == 'application/pdf'
|
||||
assert len(FF.file_hash) > 0
|
||||
assert isinstance(FF.upload_file, File)
|
||||
assert FF.upload_status == 'pending'
|
||||
|
||||
FF = atreal_openads.upload2ForwardFile(TEST_FILE_PLAN_CADASTRAL, FAKE_NUMERO_DOSSIER, 'plan')
|
||||
assert isinstance(FF, ForwardFile)
|
||||
assert len(FF.numero_demande) > 0
|
||||
assert FF.numero_dossier == FAKE_NUMERO_DOSSIER
|
||||
assert FF.type_fichier == 'plan'
|
||||
assert FF.orig_filename == os.path.basename(TEST_FILE_PLAN_CADASTRAL)
|
||||
assert FF.content_type == 'application/pdf'
|
||||
assert len(FF.file_hash) > 0
|
||||
assert isinstance(FF.upload_file, File)
|
||||
assert FF.upload_status == 'pending'
|
||||
|
||||
|
||||
def test_openads_get_fwd_files(app, atreal_openads):
|
||||
def test_openads_get_fwd_files(atreal_openads):
|
||||
with pytest.raises(APIError) as e:
|
||||
atreal_openads.get_fwd_files(None, FAKE_NUMERO_DOSSIER, fichier_id='not an integer')
|
||||
assert unicode(e.value) == u"fichier_id must be an integer"
|
||||
|
@ -553,7 +677,7 @@ def test_openads_get_fwd_files(app, atreal_openads):
|
|||
assert resp_empty is not None
|
||||
assert len(resp_empty) == 0
|
||||
|
||||
FF = atreal_openads.upload2ForwardFile(TEST_FILE_CERFA_DIA, FAKE_NUMERO_DOSSIER, 'cerfa')
|
||||
FF = upload2ForwardFile(atreal_openads, TEST_FILE_CERFA_DIA, FAKE_NUMERO_DOSSIER, 'cerfa')
|
||||
FF.save()
|
||||
assert isinstance(FF, ForwardFile)
|
||||
|
||||
|
@ -576,12 +700,12 @@ def test_openads_get_fwd_files(app, atreal_openads):
|
|||
assert jresp[0]['last_update_datetime'] == FF.last_update_datetime
|
||||
|
||||
|
||||
def test_openads_get_fwd_files_status(app, atreal_openads):
|
||||
def test_openads_get_fwd_files_status(atreal_openads):
|
||||
with pytest.raises(Http404) as e:
|
||||
atreal_openads.get_fwd_files_status(None, FAKE_NUMERO_DOSSIER, fichier_id=18)
|
||||
assert re.search(r"^No file matches 'numero_dossier=[^']+' and 'id=[^']+'.$", str(e.value))
|
||||
|
||||
FF = atreal_openads.upload2ForwardFile(TEST_FILE_CERFA_DIA, FAKE_NUMERO_DOSSIER, 'cerfa')
|
||||
FF = upload2ForwardFile(atreal_openads, TEST_FILE_CERFA_DIA, FAKE_NUMERO_DOSSIER, 'cerfa')
|
||||
FF.save()
|
||||
assert isinstance(FF, ForwardFile)
|
||||
|
||||
|
@ -606,7 +730,7 @@ def test_openads_get_fwd_files_status(app, atreal_openads):
|
|||
assert len(jresp['failed']) == 0
|
||||
|
||||
|
||||
def test_openads_get_courrier(app, atreal_openads):
|
||||
def test_openads_get_courrier(atreal_openads):
|
||||
lettre_type = 'dia_renonciation_preempter'
|
||||
|
||||
fake_resp_bad = Response()
|
||||
|
@ -655,7 +779,7 @@ def test_openads_get_courrier(app, atreal_openads):
|
|||
assert unicode(e.value) == u'No JSON content returned: %r' % fake_resp._content
|
||||
|
||||
|
||||
def test_get_response_error(app, atreal_openads):
|
||||
def test_get_response_error(atreal_openads):
|
||||
fake_resp_json = {
|
||||
'errors': [
|
||||
{
|
||||
|
@ -685,16 +809,34 @@ def test_get_response_error(app, atreal_openads):
|
|||
assert error_msg == u"HTTP error: %s, %s" % (fake_resp.status_code, fake_resp._content)
|
||||
|
||||
|
||||
def test_openads_upload_user_files(app, atreal_openads):
|
||||
def test_openads_upload_user_files(atreal_openads):
|
||||
|
||||
# TODO check logs (because this doesn't do anything but log)
|
||||
atreal_openads.upload_user_files('DIA', FAKE_NUMERO_DOSSIER, file_ids=[])
|
||||
req = HttpRequest()
|
||||
req._body = ''
|
||||
req.path = '/upload_user_files'
|
||||
req.method = 'GET'
|
||||
req.encoding = 'utf-8'
|
||||
req.GET = QueryDict(mutable=True) # required because of encoding setter
|
||||
req.POST = QueryDict(mutable=True) # required because of encoding setter
|
||||
req.content_type = 'application/json'
|
||||
req.content_params = None
|
||||
req.COOKIES = {}
|
||||
req.META = {}
|
||||
req._read_started = False
|
||||
|
||||
with pytest.raises(ForwardFile.DoesNotExist) as e:
|
||||
atreal_openads.upload_user_files('DIA', FAKE_NUMERO_DOSSIER, file_ids=[999])
|
||||
assert unicode(e.value) == u"ForwardFile matching query does not exist."
|
||||
atreal_openads.upload_user_files(req, 'DIA', FAKE_NUMERO_DOSSIER, file_ids=[999])
|
||||
assert unicode(e.value) == u"The following ForwardFile IDs were not found: %s." % [999]
|
||||
|
||||
FF = atreal_openads.upload2ForwardFile(TEST_FILE_CERFA_DIA, FAKE_NUMERO_DOSSIER, 'cerfa')
|
||||
with pytest.raises(ValueError) as e:
|
||||
atreal_openads.upload_user_files(req, 'DIA', FAKE_NUMERO_DOSSIER, file_ids='invalid string')
|
||||
assert unicode(e.value) == u"invalid literal for int() with base 10: '%s'" % 'invalid string'
|
||||
|
||||
with pytest.raises(TypeError) as e:
|
||||
atreal_openads.upload_user_files(req, 'DIA', FAKE_NUMERO_DOSSIER, file_ids={'invalid':'type'})
|
||||
assert unicode(e.value) == u"Invalid 'file_ids' argument type '%s' (must be string or list)" % type({'invalid':'type'})
|
||||
|
||||
FF = upload2ForwardFile(atreal_openads, TEST_FILE_CERFA_DIA, FAKE_NUMERO_DOSSIER, 'cerfa')
|
||||
FF.save()
|
||||
assert isinstance(FF, ForwardFile)
|
||||
assert FF.upload_status == 'pending'
|
||||
|
@ -705,9 +847,11 @@ def test_openads_upload_user_files(app, atreal_openads):
|
|||
fake_resp_bad.status_code = 502
|
||||
fake_resp_bad.reason = 'Bad gateway'
|
||||
|
||||
with mock.patch('passerelle.utils.Request.post') as requests_post:
|
||||
requests_post.return_value = fake_resp_bad
|
||||
atreal_openads.upload_user_files('DIA', FAKE_NUMERO_DOSSIER, file_ids=[file_id])
|
||||
with pytest.raises(APIError) as e:
|
||||
with mock.patch('passerelle.utils.Request.post') as requests_post:
|
||||
requests_post.return_value = fake_resp_bad
|
||||
atreal_openads.upload_user_files(req, 'DIA', FAKE_NUMERO_DOSSIER, file_ids=str(file_id))
|
||||
assert unicode(e.value) == u'HTTP error: 502'
|
||||
|
||||
FFup = ForwardFile.objects.get(id=file_id)
|
||||
assert isinstance(FFup, ForwardFile)
|
||||
|
@ -717,6 +861,9 @@ def test_openads_upload_user_files(app, atreal_openads):
|
|||
assert FFup.upload_status == 'failed'
|
||||
assert FFup.upload_msg == "HTTP error: 502"
|
||||
|
||||
FFup.upload_status = 'pending'
|
||||
FFup.save()
|
||||
|
||||
fake_resp = Response()
|
||||
fake_resp.status_code = 200
|
||||
fake_resp.headers = {'Content-Type': 'application/json'}
|
||||
|
@ -724,9 +871,11 @@ def test_openads_upload_user_files(app, atreal_openads):
|
|||
fake_resp.reason = 'OK'
|
||||
|
||||
fake_resp._content = 'invalid_;{[content}'
|
||||
with mock.patch('passerelle.utils.Request.post') as requests_post:
|
||||
requests_post.return_value = fake_resp
|
||||
atreal_openads.upload_user_files('DIA', FAKE_NUMERO_DOSSIER, file_ids=[file_id])
|
||||
with pytest.raises(APIError) as e:
|
||||
with mock.patch('passerelle.utils.Request.post') as requests_post:
|
||||
requests_post.return_value = fake_resp
|
||||
atreal_openads.upload_user_files(req, 'DIA', FAKE_NUMERO_DOSSIER, file_ids=[file_id])
|
||||
assert unicode(e.value) == u'No JSON content returned: %r' % fake_resp.content
|
||||
|
||||
FFup = ForwardFile.objects.get(id=file_id)
|
||||
assert isinstance(FFup, ForwardFile)
|
||||
|
@ -736,11 +885,19 @@ def test_openads_upload_user_files(app, atreal_openads):
|
|||
assert FFup.upload_status == 'failed'
|
||||
assert FFup.upload_msg == u'No JSON content returned: %r' % fake_resp._content
|
||||
|
||||
jresp = atreal_openads.upload_user_files(req, 'DIA', FAKE_NUMERO_DOSSIER)
|
||||
assert jresp == {'message': 'no file to transfer'}
|
||||
|
||||
FFup = ForwardFile.objects.get(id=file_id)
|
||||
FFup.upload_status = 'pending'
|
||||
FFup.save()
|
||||
|
||||
fake_resp_json = "You want add some files on %s " % FAKE_NUMERO_DOSSIER
|
||||
fake_resp._content = json.dumps(fake_resp_json)
|
||||
with mock.patch('passerelle.utils.Request.post') as requests_post:
|
||||
requests_post.return_value = fake_resp
|
||||
atreal_openads.upload_user_files('DIA', FAKE_NUMERO_DOSSIER, file_ids=[file_id])
|
||||
jresp = atreal_openads.upload_user_files(req, 'DIA', FAKE_NUMERO_DOSSIER)
|
||||
assert jresp == {'message': 'all files transfered successfully'}
|
||||
|
||||
FFup = ForwardFile.objects.get(id=file_id)
|
||||
assert isinstance(FFup, ForwardFile)
|
||||
|
|
|
@ -0,0 +1,130 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
|
||||
import pytest
|
||||
import os
|
||||
import base64
|
||||
import datetime
|
||||
|
||||
from django.core.files import File
|
||||
|
||||
from atreal_openads.forms import (
|
||||
ForwardFileForm,
|
||||
CollectiviteForm,
|
||||
GuichetForm
|
||||
)
|
||||
|
||||
from atreal_openads.models import (
|
||||
ForwardFile,
|
||||
Guichet,
|
||||
Collectivite,
|
||||
AtrealOpenads
|
||||
)
|
||||
|
||||
|
||||
CONNECTOR_NAME = 'atreal-openads'
|
||||
CONNECTOR_SLUG = 'atreal'
|
||||
COLLECTIVITE = 79
|
||||
OPENADS_API_LOGIN = 'publik-passerelle'
|
||||
OPENADS_API_PASSWORD = base64.urlsafe_b64encode(os.urandom(20))
|
||||
OPENADS_API_URL = 'http://openads.api/'
|
||||
|
||||
FAKE_COOKIE_CRSF = base64.urlsafe_b64encode(os.urandom(20))
|
||||
FAKE_NUMERO_DOSSIER = base64.urlsafe_b64encode(os.urandom(10))
|
||||
|
||||
TESTS_DIR = os.path.dirname(__file__)
|
||||
RESOURCES_DIR = os.path.join(TESTS_DIR, 'resources')
|
||||
TEST_FILE_CERFA_DIA = os.path.join(RESOURCES_DIR, 'cerfa_10072-02.pdf')
|
||||
TEST_FILE_PLAN_CADASTRAL = os.path.join(RESOURCES_DIR, 'plancadastral.pdf')
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def atreal_openads(db):
|
||||
return AtrealOpenads.objects.create(
|
||||
slug = CONNECTOR_SLUG,
|
||||
default_collectivite_openADS_id = COLLECTIVITE,
|
||||
openADS_API_url = OPENADS_API_URL,
|
||||
basic_auth_username = OPENADS_API_LOGIN,
|
||||
basic_auth_password = OPENADS_API_PASSWORD
|
||||
)
|
||||
|
||||
@pytest.fixture
|
||||
def collectivite_1(db, atreal_openads):
|
||||
return Collectivite.objects.create(
|
||||
name = u'Macollectivité',
|
||||
connecteur = atreal_openads,
|
||||
openADS_id = '3'
|
||||
)
|
||||
|
||||
@pytest.fixture
|
||||
def collectivite_1_guichet(db, atreal_openads, collectivite_1):
|
||||
return Guichet.objects.create(
|
||||
collectivite = collectivite_1,
|
||||
ouverture_jour_h = datetime.time(9, 0),
|
||||
fermeture_jour_h = datetime.time(17, 0),
|
||||
ouverture_sem_d = 1, # Lundi
|
||||
fermeture_sem_d = 5, # Vendredi
|
||||
ouverture_sem_h = datetime.time(8, 30),
|
||||
fermeture_sem_h = datetime.time(12, 15)
|
||||
)
|
||||
|
||||
|
||||
def test_forwardfile_form(atreal_openads, collectivite_1):
|
||||
form = ForwardFileForm()
|
||||
assert form.instance is not None
|
||||
|
||||
ff = ForwardFile(
|
||||
connecteur = None,
|
||||
collectivite = None,
|
||||
numero_demande = '45641531',
|
||||
numero_dossier = FAKE_NUMERO_DOSSIER,
|
||||
type_fichier = 'CERFA',
|
||||
orig_filename = os.path.basename(TEST_FILE_CERFA_DIA),
|
||||
content_type = 'application/pdf',
|
||||
file_hash = 'ffdf456fdsvgb4bgfb6g4f5b',
|
||||
upload_file = File(open(TEST_FILE_CERFA_DIA, 'r')),
|
||||
upload_status = 'pending'
|
||||
)
|
||||
|
||||
form_with_instance = ForwardFileForm(instance=ff, collectivite=collectivite_1)
|
||||
assert form_with_instance.instance is ff
|
||||
assert form_with_instance.instance.collectivite is collectivite_1
|
||||
|
||||
form_with_instance = ForwardFileForm(instance=ff, connecteur=atreal_openads)
|
||||
assert form_with_instance.instance is ff
|
||||
assert form_with_instance.instance.connecteur is atreal_openads
|
||||
|
||||
# TODO check the queryset of the collectivite
|
||||
|
||||
|
||||
def test_collectivite_form(atreal_openads):
|
||||
form = CollectiviteForm()
|
||||
assert form.instance is not None
|
||||
|
||||
col = Collectivite(
|
||||
connecteur = None,
|
||||
name = u'Ma collectivité',
|
||||
openADS_id = 3
|
||||
)
|
||||
|
||||
form_with_instance = CollectiviteForm(instance=col, connecteur=atreal_openads)
|
||||
assert form_with_instance.instance is col
|
||||
assert form_with_instance.instance.connecteur is atreal_openads
|
||||
|
||||
|
||||
def test_guichet_form(atreal_openads, collectivite_1):
|
||||
form = GuichetForm()
|
||||
assert form.instance is not None
|
||||
|
||||
gui = Guichet(
|
||||
collectivite = None,
|
||||
ouverture_jour_h = datetime.time(9, 0, 0),
|
||||
fermeture_jour_h = datetime.time(18, 0, 0),
|
||||
ouverture_sem_d = 1,
|
||||
fermeture_sem_d = 5,
|
||||
ouverture_sem_h = datetime.time(10, 30, 0),
|
||||
fermeture_sem_h = datetime.time(12, 15, 0)
|
||||
)
|
||||
|
||||
form_with_instance = GuichetForm(instance=gui, collectivite=collectivite_1)
|
||||
assert form_with_instance.instance is gui
|
||||
assert form_with_instance.instance.collectivite is collectivite_1
|
|
@ -0,0 +1,288 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
|
||||
# to run it use the following command in the 'tests' directory:
|
||||
# ~> DJANGO_SETTINGS_MODULE=passerelle.settings pytest -W ignore::django.utils.deprecation.RemovedInDjango20Warning test_atreal_openads.py -vv
|
||||
#
|
||||
# and with 'coverage':
|
||||
# ~> DJANGO_SETTINGS_MODULE=passerelle.settings pytest -W ignore::django.utils.deprecation.RemovedInDjango20Warning test_atreal_openads.py -vv --cov=~/src/passerelle/passerelle/apps/atreal_openads
|
||||
|
||||
import pytest
|
||||
import os
|
||||
import base64
|
||||
import re
|
||||
import datetime
|
||||
|
||||
from django.core.files import File
|
||||
from django.core.files.base import ContentFile
|
||||
|
||||
from atreal_openads.utils import (
|
||||
to_dash_case,
|
||||
force_encoded_string_output,
|
||||
strip_tags,
|
||||
clean_spaces,
|
||||
normalize,
|
||||
get_file_data,
|
||||
get_file_digest,
|
||||
get_upload_path,
|
||||
get_file_extension,
|
||||
trunc_str_values,
|
||||
DictDumper
|
||||
)
|
||||
|
||||
from atreal_openads.models import (
|
||||
ForwardFile,
|
||||
Guichet,
|
||||
Collectivite,
|
||||
AtrealOpenads
|
||||
)
|
||||
|
||||
|
||||
CONNECTOR_NAME = 'atreal-openads'
|
||||
CONNECTOR_SLUG = 'atreal'
|
||||
COLLECTIVITE = 79
|
||||
OPENADS_API_LOGIN = 'publik-passerelle'
|
||||
OPENADS_API_PASSWORD = base64.urlsafe_b64encode(os.urandom(20))
|
||||
OPENADS_API_URL = 'http://openads.api/'
|
||||
|
||||
FAKE_COOKIE_CRSF = base64.urlsafe_b64encode(os.urandom(20))
|
||||
FAKE_NUMERO_DOSSIER = base64.urlsafe_b64encode(os.urandom(10))
|
||||
|
||||
TESTS_DIR = os.path.dirname(__file__)
|
||||
RESOURCES_DIR = os.path.join(TESTS_DIR, 'resources')
|
||||
TEST_FILE_CERFA_DIA = os.path.join(RESOURCES_DIR, 'cerfa_10072-02.pdf')
|
||||
TEST_FILE_PLAN_CADASTRAL = os.path.join(RESOURCES_DIR, 'plancadastral.pdf')
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def atreal_openads(db):
|
||||
return AtrealOpenads.objects.create(
|
||||
slug = CONNECTOR_SLUG,
|
||||
default_collectivite_openADS_id = COLLECTIVITE,
|
||||
openADS_API_url = OPENADS_API_URL,
|
||||
basic_auth_username = OPENADS_API_LOGIN,
|
||||
basic_auth_password = OPENADS_API_PASSWORD
|
||||
)
|
||||
|
||||
@pytest.fixture
|
||||
def collectivite_1(db, atreal_openads):
|
||||
return Collectivite.objects.create(
|
||||
name = u'Macollectivité',
|
||||
connecteur = atreal_openads,
|
||||
openADS_id = '3'
|
||||
)
|
||||
|
||||
@pytest.fixture
|
||||
def collectivite_1_guichet(db, atreal_openads, collectivite_1):
|
||||
return Guichet.objects.create(
|
||||
collectivite = collectivite_1,
|
||||
ouverture_jour_h = datetime.time(9, 0),
|
||||
fermeture_jour_h = datetime.time(17, 0),
|
||||
ouverture_sem_d = 1, # Lundi
|
||||
fermeture_sem_d = 5, # Vendredi
|
||||
ouverture_sem_h = datetime.time(8, 30),
|
||||
fermeture_sem_h = datetime.time(12, 15)
|
||||
)
|
||||
|
||||
|
||||
def test_to_dash_case():
|
||||
s = 'ACamelCaseName'
|
||||
assert to_dash_case(s) == 'a-camel-case-name'
|
||||
|
||||
assert to_dash_case('') == ''
|
||||
|
||||
|
||||
def test_force_encoded_string_output():
|
||||
def a_str_function():
|
||||
return str('toto')
|
||||
ret = force_encoded_string_output(a_str_function)()
|
||||
assert isinstance(ret, str)
|
||||
ret = force_encoded_string_output(a_str_function, 'latin1')()
|
||||
assert isinstance(ret, str)
|
||||
|
||||
def an_unicode_function():
|
||||
return u'toto'
|
||||
ret = force_encoded_string_output(an_unicode_function)()
|
||||
assert isinstance(ret, str)
|
||||
ret = force_encoded_string_output(an_unicode_function, 'latin1')()
|
||||
assert isinstance(ret, str)
|
||||
|
||||
|
||||
def test_strip_tags():
|
||||
s = 'aaa b cc '
|
||||
assert strip_tags(s) == s
|
||||
|
||||
ss = s + '<em>dd'
|
||||
assert strip_tags(ss) == s + 'dd'
|
||||
|
||||
ss = s + '<em>dd</em>'
|
||||
assert strip_tags(ss) == s + 'dd'
|
||||
|
||||
ss = s + '<em>dd</em>'
|
||||
assert strip_tags(ss) == s + 'dd'
|
||||
|
||||
ss = s + ' 1 < 3'
|
||||
assert strip_tags(ss) == s + ' 1 < 3'
|
||||
|
||||
|
||||
def test_clean_spaces():
|
||||
s = 'aaa b cc '
|
||||
assert clean_spaces(s) == 'aaa b cc'
|
||||
|
||||
s = 'a\ta b\nb c\rc d\\n\\r\\td'
|
||||
assert clean_spaces(s) == 'a a b b c c d d'
|
||||
|
||||
|
||||
def test_normalize():
|
||||
assert normalize(None) == ''
|
||||
|
||||
s = 'aaa b cc '
|
||||
assert normalize(s) == 'aaa b cc'
|
||||
|
||||
s = 'a\ta b\nb c\rc d\\n\\r\\td'
|
||||
assert normalize(s) == 'a a b b c c d d'
|
||||
|
||||
|
||||
def test_get_file_data():
|
||||
assert get_file_data(TEST_FILE_CERFA_DIA) == base64.b64encode(open(TEST_FILE_CERFA_DIA).read())
|
||||
assert get_file_data(TEST_FILE_CERFA_DIA, b64=False) == open(TEST_FILE_CERFA_DIA).read()
|
||||
|
||||
|
||||
def test_get_file_digest():
|
||||
with open(TEST_FILE_CERFA_DIA) as fd:
|
||||
assert get_file_digest(fd) == 'cc90a620982760fdee16a5b4fe1b5ac3b4fe868fd02d2f70b27f1e46d283ea51'
|
||||
|
||||
|
||||
def test_get_upload_path():
|
||||
ff = ForwardFile(
|
||||
numero_demande='45641531',
|
||||
numero_dossier=FAKE_NUMERO_DOSSIER,
|
||||
type_fichier='CERFA',
|
||||
orig_filename=os.path.basename(TEST_FILE_CERFA_DIA),
|
||||
content_type='application/pdf',
|
||||
file_hash='ffdf456fdsvgb4bgfb6g4f5b',
|
||||
upload_file=File(open(TEST_FILE_CERFA_DIA, 'r')),
|
||||
upload_status='pending',
|
||||
connecteur=None,
|
||||
collectivite=None
|
||||
)
|
||||
regex = r"^to_openADS__%s__%s\.pdf$" % (
|
||||
'[0-9]{4}-[0-9]{2}-[0-9]{2}_[0-9]{2}h[0-9]{2}m[0-9]{2}s[0-9]+', 'ffdf')
|
||||
assert re.search(regex, get_upload_path(ff))
|
||||
|
||||
|
||||
def test_get_file_extension():
|
||||
assert get_file_extension('afile.pdf') == '.pdf'
|
||||
assert get_file_extension('afile', 'application/pdf') == '.pdf'
|
||||
assert get_file_extension('') == ''
|
||||
assert get_file_extension('afile') == ''
|
||||
|
||||
|
||||
def test_trunc_str_values():
|
||||
d = {}
|
||||
assert trunc_str_values(d, 10) == d
|
||||
d = {'a': '123456789'}
|
||||
assert trunc_str_values(d, 0) == {'a': u'…'}
|
||||
d = {'a': '123456789'}
|
||||
assert trunc_str_values(d, 1) == {'a': u'1…'}
|
||||
d = {'a': '123456789'}
|
||||
assert trunc_str_values(d, 2) == {'a': u'12…'}
|
||||
d = {'a': '123456789'}
|
||||
assert trunc_str_values(d, 5) == {'a': u'12345…'}
|
||||
d = {'a': '123456789'}
|
||||
assert trunc_str_values(d, 8) == {'a': u'12345678…'}
|
||||
d = {'a': '123456789'}
|
||||
assert trunc_str_values(d, 9) == {'a': u'123456789'}
|
||||
d = {'a': '123456789'}
|
||||
assert trunc_str_values(d, 10) == d
|
||||
|
||||
d = {'a': '123456789', 'b123456789': '987654321'}
|
||||
assert trunc_str_values(d, 5) == {'a': u'12345…', 'b123456789': u'98765…'}
|
||||
|
||||
d = {'a': '123456789', 'b123456789': '987654321', 'c': {'c1':'ABCDEFGHIJK'}}
|
||||
assert trunc_str_values(d, 5) == {'a': u'12345…', 'b123456789': u'98765…', 'c': {'c1': u'ABCDE…'}}
|
||||
|
||||
d = {'a': '123456789', 'b123456789': '987654321', 'c': {'c1':'ABCDEFGHIJK'}, 'd': ['123456789']}
|
||||
assert trunc_str_values(d, 5) == {'a': u'12345…', 'b123456789': u'98765…', 'c': {'c1': u'ABCDE…'}, 'd': [u'12345…']}
|
||||
|
||||
d = {'a': '123456789', 'b123456789': '987654321', 'c': {'c1':'ABCDEFGHIJK'}, 'd': ['123456789', {'eeeeeeeeee':'132456789'}]}
|
||||
assert trunc_str_values(d, 5) == {'a': u'12345…', 'b123456789': u'98765…', 'c': {'c1': u'ABCDE…'}, 'd': [u'12345…', {'eeeeeeeeee': u'13245…'}]}
|
||||
|
||||
|
||||
def test_dict_dumper():
|
||||
d = {}
|
||||
|
||||
dd = DictDumper(d, use_json_dumps=False)
|
||||
assert repr(dd) == (u'DictDumper(dic=%r,max_str_len=%r,use_json_dumps=%r)' % (
|
||||
d, dd.max_str_len, dd.use_json_dumps)).encode('utf-8')
|
||||
assert str(dd) == '{}'
|
||||
assert unicode(dd) == u'{}'
|
||||
|
||||
assert d == dd.dic
|
||||
assert unicode(d) == unicode(dd)
|
||||
dd = DictDumper(d, 0, use_json_dumps=False)
|
||||
assert d == dd.dic
|
||||
assert unicode(d) == unicode(dd)
|
||||
|
||||
d = {'a': '123456789'}
|
||||
dd = DictDumper(d, 10, use_json_dumps=False)
|
||||
assert d == dd.dic
|
||||
assert unicode(d) == unicode(dd)
|
||||
dd = DictDumper(d, 5, use_json_dumps=False)
|
||||
assert d == dd.dic
|
||||
assert unicode(dd) == unicode({'a': u'12345…'})
|
||||
dd = DictDumper(d, 5, use_json_dumps=True)
|
||||
assert d == dd.dic
|
||||
assert unicode(dd) == u'{"a": "12345\\u2026"}'
|
||||
|
||||
|
||||
def test_base_model(atreal_openads, collectivite_1, collectivite_1_guichet):
|
||||
ff = ForwardFile(
|
||||
numero_demande='45641531',
|
||||
numero_dossier=FAKE_NUMERO_DOSSIER,
|
||||
type_fichier='CERFA',
|
||||
orig_filename=os.path.basename(TEST_FILE_CERFA_DIA),
|
||||
content_type='application/pdf',
|
||||
file_hash='ffdf456fdsvgb4bgfb6g4f5b',
|
||||
upload_file=ContentFile('toto'),
|
||||
upload_status='pending',
|
||||
connecteur=atreal_openads,
|
||||
collectivite=None
|
||||
)
|
||||
|
||||
assert ff.get_verbose_name() == 'Forward File'
|
||||
assert ff.get_verbose_name_plural() == 'Forward Files'
|
||||
|
||||
assert ff.get_class_name() == 'ForwardFile'
|
||||
assert ff.get_class_name_plural() == 'ForwardFiles'
|
||||
|
||||
assert ff.get_class_name_dash_case() == 'forward-file'
|
||||
assert ff.get_class_name_plural_dash_case() == 'forward-files'
|
||||
|
||||
assert ff.get_class_name_title() == 'Forward File'
|
||||
assert ff.get_class_name_plural_title() == 'Forward Files'
|
||||
|
||||
assert ff.get_url_name('list', plural=True) == 'list-forward-files'
|
||||
|
||||
assert ff.get_absolute_url() == '/manage/atreal-openads/atreal/forward-file/None'
|
||||
assert ff.get_edit_url() == '/manage/atreal-openads/atreal/edit-forward-file/None'
|
||||
assert ff.get_delete_url() == '/manage/atreal-openads/atreal/delete-forward-file/None'
|
||||
assert ff.get_list_url() == '/manage/atreal-openads/atreal/forward-files'
|
||||
|
||||
assert atreal_openads.get_class_name_plural() == 'AtrealOpenads'
|
||||
|
||||
assert atreal_openads.get_url_name('view') == 'view-connector'
|
||||
|
||||
params = atreal_openads.get_url_params(True)
|
||||
assert params['connector'] == 'atreal-openads'
|
||||
assert params['slug'] == atreal_openads.slug
|
||||
|
||||
with pytest.raises(Exception) as e:
|
||||
atreal_openads.get_list_url()
|
||||
assert unicode(e.value) == u"AtrealOpenads:get_list_url() method should not be called"
|
||||
|
||||
# TODO add more collectivite test cases
|
||||
|
||||
with pytest.raises(Exception) as e:
|
||||
collectivite_1_guichet.get_list_url()
|
||||
assert unicode(e.value) == u"Guichet:get_list_url() method should not be called"
|
||||
|
|
@ -0,0 +1,413 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
|
||||
import pytest
|
||||
import os
|
||||
import base64
|
||||
import datetime
|
||||
|
||||
from django.http.request import HttpRequest, QueryDict
|
||||
from django.urls.base import resolve
|
||||
from django.core.files import File
|
||||
|
||||
from atreal_openads.views import (
|
||||
get_connecteur_from_request,
|
||||
get_collectivite_from_request,
|
||||
AtrealOpenadsView,
|
||||
ForwardFileView,
|
||||
ForwardFileListView,
|
||||
ForwardFileUpdateView,
|
||||
ForwardFileDeleteView,
|
||||
CollectiviteView,
|
||||
CollectiviteListView,
|
||||
CollectiviteCreateView,
|
||||
CollectiviteUpdateView,
|
||||
CollectiviteDeleteView,
|
||||
GuichetView,
|
||||
GuichetCreateView,
|
||||
GuichetUpdateView,
|
||||
GuichetDeleteView
|
||||
)
|
||||
|
||||
from atreal_openads.models import (
|
||||
ForwardFile,
|
||||
Guichet,
|
||||
Collectivite,
|
||||
AtrealOpenads
|
||||
)
|
||||
|
||||
|
||||
CONNECTOR_NAME = 'atreal-openads'
|
||||
CONNECTOR_SLUG = 'atreal'
|
||||
COLLECTIVITE = 79
|
||||
OPENADS_API_LOGIN = 'publik-passerelle'
|
||||
OPENADS_API_PASSWORD = base64.urlsafe_b64encode(os.urandom(20))
|
||||
OPENADS_API_URL = 'http://openads.api/'
|
||||
|
||||
FAKE_COOKIE_CRSF = base64.urlsafe_b64encode(os.urandom(20))
|
||||
FAKE_NUMERO_DOSSIER = base64.urlsafe_b64encode(os.urandom(10))
|
||||
|
||||
TESTS_DIR = os.path.dirname(__file__)
|
||||
RESOURCES_DIR = os.path.join(TESTS_DIR, 'resources')
|
||||
TEST_FILE_CERFA_DIA = os.path.join(RESOURCES_DIR, 'cerfa_10072-02.pdf')
|
||||
TEST_FILE_PLAN_CADASTRAL = os.path.join(RESOURCES_DIR, 'plancadastral.pdf')
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def atreal_openads(db):
|
||||
return AtrealOpenads.objects.create(
|
||||
slug = CONNECTOR_SLUG,
|
||||
default_collectivite_openADS_id = COLLECTIVITE,
|
||||
openADS_API_url = OPENADS_API_URL,
|
||||
basic_auth_username = OPENADS_API_LOGIN,
|
||||
basic_auth_password = OPENADS_API_PASSWORD
|
||||
)
|
||||
|
||||
@pytest.fixture
|
||||
def collectivite_1(db, atreal_openads):
|
||||
return Collectivite.objects.create(
|
||||
name = u'Macollectivité',
|
||||
connecteur = atreal_openads,
|
||||
openADS_id = '3'
|
||||
)
|
||||
|
||||
@pytest.fixture
|
||||
def collectivite_1_guichet(db, atreal_openads, collectivite_1):
|
||||
return Guichet.objects.create(
|
||||
collectivite = collectivite_1,
|
||||
ouverture_jour_h = datetime.time(9, 0),
|
||||
fermeture_jour_h = datetime.time(17, 0),
|
||||
ouverture_sem_d = 1, # Lundi
|
||||
fermeture_sem_d = 5, # Vendredi
|
||||
ouverture_sem_h = datetime.time(8, 30),
|
||||
fermeture_sem_h = datetime.time(12, 15)
|
||||
)
|
||||
|
||||
@pytest.fixture
|
||||
def forwardfile_1(db, atreal_openads, collectivite_1):
|
||||
return ForwardFile.objects.create(
|
||||
connecteur = atreal_openads,
|
||||
collectivite = collectivite_1,
|
||||
numero_demande = '45641531',
|
||||
numero_dossier = FAKE_NUMERO_DOSSIER,
|
||||
type_fichier = 'CERFA',
|
||||
orig_filename = os.path.basename(TEST_FILE_CERFA_DIA),
|
||||
content_type = 'application/pdf',
|
||||
file_hash = 'ffdf456fdsvgb4bgfb6g4f5b',
|
||||
upload_file = File(open(TEST_FILE_CERFA_DIA, 'r')),
|
||||
upload_status = 'pending'
|
||||
)
|
||||
|
||||
|
||||
def test_get_connecteur_from_request(atreal_openads, forwardfile_1):
|
||||
req = HttpRequest()
|
||||
req.path = '/manage/atreal-openads/%s/forward-file/%s' % (
|
||||
atreal_openads.slug, forwardfile_1.id)
|
||||
req.method = 'GET'
|
||||
req.encoding = 'utf-8'
|
||||
req.GET = QueryDict(mutable=True) # required because of encoding setter
|
||||
req.POST = QueryDict(mutable=True) # required because of encoding setter
|
||||
req.content_params = None
|
||||
req.COOKIES = {}
|
||||
req.META = {}
|
||||
req._read_started = False
|
||||
req.resolver_match = resolve(req.path)
|
||||
|
||||
view = ForwardFileView()
|
||||
view.request = req
|
||||
|
||||
connecteur = get_connecteur_from_request(view)
|
||||
assert connecteur is not None
|
||||
assert connecteur.slug == atreal_openads.slug
|
||||
|
||||
|
||||
def test_get_collectivite_from_request(atreal_openads, collectivite_1):
|
||||
req = HttpRequest()
|
||||
req.path = '/manage/atreal-openads/%s/collectivite/%s/forward-files' % (
|
||||
atreal_openads.slug, collectivite_1.id)
|
||||
req.method = 'GET'
|
||||
req.encoding = 'utf-8'
|
||||
req.GET = QueryDict(mutable=True) # required because of encoding setter
|
||||
req.POST = QueryDict(mutable=True) # required because of encoding setter
|
||||
req.content_params = None
|
||||
req.COOKIES = {}
|
||||
req.META = {}
|
||||
req._read_started = False
|
||||
req.resolver_match = resolve(req.path)
|
||||
|
||||
view = ForwardFileListView()
|
||||
view.request = req
|
||||
|
||||
collectivite = get_collectivite_from_request(view)
|
||||
assert collectivite is not None
|
||||
assert collectivite.id == collectivite_1.id
|
||||
|
||||
|
||||
def test_forwardfile_view(atreal_openads, collectivite_1, forwardfile_1):
|
||||
req = HttpRequest()
|
||||
req.path = '/manage/atreal-openads/%s/forward-file/%s' % (
|
||||
atreal_openads.slug, forwardfile_1.id)
|
||||
req.method = 'GET'
|
||||
req.encoding = 'utf-8'
|
||||
req.GET = QueryDict(mutable=True) # required because of encoding setter
|
||||
req.POST = QueryDict(mutable=True) # required because of encoding setter
|
||||
req.content_params = None
|
||||
req.COOKIES = {}
|
||||
req.META = {}
|
||||
req._read_started = False
|
||||
req.resolver_match = resolve(req.path)
|
||||
|
||||
view = ForwardFileView()
|
||||
view.request = req
|
||||
view.object = None
|
||||
view.kwargs = req.resolver_match.kwargs
|
||||
context = view.get_context_data()
|
||||
assert context['connecteur'].slug == atreal_openads.slug
|
||||
|
||||
view = ForwardFileUpdateView()
|
||||
view.request = req
|
||||
view.object = None
|
||||
view.kwargs = req.resolver_match.kwargs
|
||||
context = view.get_context_data()
|
||||
assert context['connecteur'].slug == atreal_openads.slug
|
||||
url = view.get_success_url()
|
||||
assert url == u'/manage/atreal-openads/%s/forward-file/%s' % (
|
||||
atreal_openads.slug, forwardfile_1.id)
|
||||
req.GET['back-to'] = 'list-forward-files'
|
||||
url = view.get_success_url()
|
||||
assert url == u'/manage/atreal-openads/%s/forward-files' % atreal_openads.slug
|
||||
req.GET['back-to'] = 'col-list-forward-files'
|
||||
url = view.get_success_url()
|
||||
assert url == u'/manage/atreal-openads/%s/collectivite/%s/forward-files' % (
|
||||
atreal_openads.slug, collectivite_1.id)
|
||||
|
||||
view = ForwardFileDeleteView()
|
||||
view.request = req
|
||||
view.object = None
|
||||
view.kwargs = req.resolver_match.kwargs
|
||||
context = view.get_context_data()
|
||||
assert context['connecteur'].slug == atreal_openads.slug
|
||||
del(req.GET['back-to'])
|
||||
url = view.get_success_url()
|
||||
assert url == u'/atreal-openads/%s/' % atreal_openads.slug
|
||||
req.GET['back-to'] = 'list-forward-files'
|
||||
url = view.get_success_url()
|
||||
assert url == u'/manage/atreal-openads/%s/forward-files' % atreal_openads.slug
|
||||
req.GET['back-to'] = 'col-list-forward-files'
|
||||
url = view.get_success_url()
|
||||
assert url == u'/manage/atreal-openads/%s/collectivite/%s/forward-files' % (
|
||||
atreal_openads.slug, collectivite_1.id)
|
||||
|
||||
req.path = '/manage/atreal-openads/%s/collectivite/%s/forward-files' % (
|
||||
atreal_openads.slug, collectivite_1.id)
|
||||
req.resolver_match = resolve(req.path)
|
||||
view = ForwardFileListView()
|
||||
view.request = req
|
||||
view.object_list = []
|
||||
view.kwargs = req.resolver_match.kwargs
|
||||
context = view.get_context_data()
|
||||
assert context['connecteur'].slug == atreal_openads.slug
|
||||
assert context['collectivite'].id == collectivite_1.id
|
||||
|
||||
qs = view.get_queryset()
|
||||
assert qs.query is not None
|
||||
assert qs.query.order_by == ['id']
|
||||
assert qs.query.default_ordering == True
|
||||
assert qs.query.get_meta().ordering == ['-last_update_datetime']
|
||||
assert qs.ordered
|
||||
|
||||
req.GET['order-by'] = '-id'
|
||||
qs = view.get_queryset()
|
||||
assert qs.query is not None
|
||||
assert qs.query.order_by == ['-id']
|
||||
assert qs.query.default_ordering == True
|
||||
|
||||
req.path = '/manage/atreal-openads/%s/forward-files' % atreal_openads.slug
|
||||
req.resolver_match = resolve(req.path)
|
||||
del(req.GET['back-to'])
|
||||
del(req.GET['order-by'])
|
||||
view = ForwardFileListView()
|
||||
view.request = req
|
||||
view.object_list = []
|
||||
view.kwargs = req.resolver_match.kwargs
|
||||
context = view.get_context_data()
|
||||
assert context['connecteur'].slug == atreal_openads.slug
|
||||
|
||||
qs = view.get_queryset()
|
||||
assert qs.query is not None
|
||||
assert qs.query.order_by == ['id']
|
||||
assert qs.query.default_ordering == True
|
||||
assert qs.query.get_meta().ordering == ['-last_update_datetime']
|
||||
assert qs.ordered
|
||||
|
||||
|
||||
def test_collectivite_view(atreal_openads, collectivite_1, forwardfile_1):
|
||||
req = HttpRequest()
|
||||
req.path = '/manage/atreal-openads/%s/collectivite/%s' % (
|
||||
atreal_openads.slug, collectivite_1.id)
|
||||
req.method = 'GET'
|
||||
req.encoding = 'utf-8'
|
||||
req.GET = QueryDict(mutable=True) # required because of encoding setter
|
||||
req.POST = QueryDict(mutable=True) # required because of encoding setter
|
||||
req.content_params = None
|
||||
req.COOKIES = {}
|
||||
req.META = {}
|
||||
req._read_started = False
|
||||
req.resolver_match = resolve(req.path)
|
||||
|
||||
view = CollectiviteView()
|
||||
view.request = req
|
||||
view.object = None
|
||||
view.kwargs = req.resolver_match.kwargs
|
||||
context = view.get_context_data()
|
||||
assert context['connecteur'].slug == atreal_openads.slug
|
||||
assert context['guichet_add_url'] == u'/manage/atreal-openads/%s/collectivite/%s/create-guichet' % (
|
||||
atreal_openads.slug, collectivite_1.id)
|
||||
assert context['forward_files_list_url'] == u'/manage/atreal-openads/%s/collectivite/%s/forward-files' % (
|
||||
atreal_openads.slug, collectivite_1.id)
|
||||
|
||||
view = CollectiviteUpdateView()
|
||||
view.request = req
|
||||
view.object = None
|
||||
view.kwargs = req.resolver_match.kwargs
|
||||
context = view.get_context_data()
|
||||
assert context['connecteur'].slug == atreal_openads.slug
|
||||
url = view.get_success_url()
|
||||
assert url == u'/manage/atreal-openads/%s/collectivite/%s' % (
|
||||
atreal_openads.slug, collectivite_1.id)
|
||||
req.GET['back-to'] = 'list-collectivites'
|
||||
url = view.get_success_url()
|
||||
assert url == u'/manage/atreal-openads/%s/collectivites' % atreal_openads.slug
|
||||
|
||||
view = CollectiviteDeleteView()
|
||||
view.request = req
|
||||
view.object = None
|
||||
view.kwargs = req.resolver_match.kwargs
|
||||
context = view.get_context_data()
|
||||
assert context['connecteur'].slug == atreal_openads.slug
|
||||
del(req.GET['back-to'])
|
||||
url = view.get_success_url()
|
||||
assert url == u'/atreal-openads/%s/' % atreal_openads.slug
|
||||
req.GET['back-to'] = 'list-collectivites'
|
||||
url = view.get_success_url()
|
||||
assert url == u'/manage/atreal-openads/%s/collectivites' % atreal_openads.slug
|
||||
|
||||
view = CollectiviteCreateView()
|
||||
req.path = '/manage/atreal-openads/%s/create-collectivite' % atreal_openads.slug
|
||||
req.resolver_match = resolve(req.path)
|
||||
view.request = req
|
||||
view.object = None
|
||||
view.kwargs = req.resolver_match.kwargs
|
||||
context = view.get_context_data()
|
||||
assert context['connecteur'].slug == atreal_openads.slug
|
||||
kwargs = view.get_form_kwargs()
|
||||
assert kwargs['connecteur'].slug == atreal_openads.slug
|
||||
del(req.GET['back-to'])
|
||||
url = view.get_success_url()
|
||||
assert url == u'/atreal-openads/%s/' % atreal_openads.slug
|
||||
req.GET['back-to'] = 'list-collectivites'
|
||||
url = view.get_success_url()
|
||||
assert url == u'/manage/atreal-openads/%s/collectivites' % atreal_openads.slug
|
||||
|
||||
req.path = '/manage/atreal-openads/%s/collectivites' % atreal_openads.slug
|
||||
req.resolver_match = resolve(req.path)
|
||||
view = CollectiviteListView()
|
||||
view.request = req
|
||||
view.object_list = []
|
||||
view.kwargs = req.resolver_match.kwargs
|
||||
context = view.get_context_data()
|
||||
assert context['connecteur'].slug == atreal_openads.slug
|
||||
assert context['collectivite_add_url'] == u'/manage/atreal-openads/%s/create-collectivite' % atreal_openads.slug
|
||||
|
||||
qs = view.get_queryset()
|
||||
assert qs.query is not None
|
||||
assert qs.query.order_by == ['id']
|
||||
assert qs.query.default_ordering == True
|
||||
assert qs.query.get_meta().ordering == ['name']
|
||||
assert qs.ordered
|
||||
|
||||
req.GET['order-by'] = '-id'
|
||||
qs = view.get_queryset()
|
||||
assert qs.query is not None
|
||||
assert qs.query.order_by == ['-id']
|
||||
assert qs.query.default_ordering == True
|
||||
|
||||
|
||||
def test_guichet_view(atreal_openads, collectivite_1, collectivite_1_guichet):
|
||||
req = HttpRequest()
|
||||
req.path = '/manage/atreal-openads/%s/collectivite/%s/guichet/%s' % (
|
||||
atreal_openads.slug, collectivite_1.id, collectivite_1_guichet.id)
|
||||
req.method = 'GET'
|
||||
req.encoding = 'utf-8'
|
||||
req.GET = QueryDict(mutable=True) # required because of encoding setter
|
||||
req.POST = QueryDict(mutable=True) # required because of encoding setter
|
||||
req.content_params = None
|
||||
req.COOKIES = {}
|
||||
req.META = {}
|
||||
req._read_started = False
|
||||
req.resolver_match = resolve(req.path)
|
||||
|
||||
view = GuichetView()
|
||||
view.request = req
|
||||
view.object = None
|
||||
view.kwargs = req.resolver_match.kwargs
|
||||
context = view.get_context_data()
|
||||
assert context['connecteur'].slug == atreal_openads.slug
|
||||
assert context['collectivite'].id == collectivite_1.id
|
||||
|
||||
view = GuichetUpdateView()
|
||||
view.request = req
|
||||
view.object = None
|
||||
view.kwargs = req.resolver_match.kwargs
|
||||
context = view.get_context_data()
|
||||
assert context['connecteur'].slug == atreal_openads.slug
|
||||
assert context['collectivite'].id == collectivite_1.id
|
||||
|
||||
view = GuichetDeleteView()
|
||||
view.request = req
|
||||
view.object = None
|
||||
view.kwargs = req.resolver_match.kwargs
|
||||
context = view.get_context_data()
|
||||
assert context['connecteur'].slug == atreal_openads.slug
|
||||
assert context['collectivite'].id == collectivite_1.id
|
||||
url = view.get_success_url()
|
||||
assert url == u'/manage/atreal-openads/%s/collectivite/%s' % (
|
||||
atreal_openads.slug, collectivite_1.id)
|
||||
|
||||
view = GuichetCreateView()
|
||||
req.path = '/manage/atreal-openads/%s/collectivite/%s/create-guichet' % (
|
||||
atreal_openads.slug, collectivite_1.id)
|
||||
req.resolver_match = resolve(req.path)
|
||||
view.request = req
|
||||
view.object = None
|
||||
view.kwargs = req.resolver_match.kwargs
|
||||
context = view.get_context_data()
|
||||
assert context['connecteur'].slug == atreal_openads.slug
|
||||
assert context['collectivite'].id == collectivite_1.id
|
||||
kwargs = view.get_form_kwargs()
|
||||
assert kwargs['collectivite'].id == collectivite_1.id
|
||||
url = view.get_success_url()
|
||||
assert url == u'/manage/atreal-openads/%s/collectivite/%s' % (
|
||||
atreal_openads.slug, collectivite_1.id)
|
||||
|
||||
|
||||
def test_connecteur_view(atreal_openads):
|
||||
req = HttpRequest()
|
||||
req.path = '/atreal-openads/%s/' % atreal_openads.slug
|
||||
req.method = 'GET'
|
||||
req.encoding = 'utf-8'
|
||||
req.GET = QueryDict(mutable=True) # required because of encoding setter
|
||||
req.POST = QueryDict(mutable=True) # required because of encoding setter
|
||||
req.content_params = None
|
||||
req.COOKIES = {}
|
||||
req.META = {}
|
||||
req._read_started = False
|
||||
req.resolver_match = resolve(req.path)
|
||||
|
||||
view = AtrealOpenadsView()
|
||||
view.request = req
|
||||
view.object = atreal_openads
|
||||
view.kwargs = req.resolver_match.kwargs
|
||||
context = view.get_context_data()
|
||||
assert context['collectivite_fields'] == Collectivite.get_fields()
|
||||
assert context['collectivite_add_url'] == u'/manage/atreal-openads/%s/create-collectivite' % atreal_openads.slug
|
||||
|
Reference in New Issue