Added 'guichet' (planned availability), added CRUD views for each entity, many fixes and refactoring

Features:
    * A connector now can be associated with 'collectivites',
      and each of them can have one 'guichet'.
      A 'collectivite' have a name and an openADS technical identifier
      A 'guichet' allow to define hours and days for when its 'collectivite' is "open"
    * Each of those entities have a view to CRUD it

    * For entity ForwardFile:
       - Added association with a connector, and eventually a 'collectivite'
       - Added ForwardFiles 'size' field
       - Added updating fields 'content_type', 'file_hash' and 'size' on save()
       - Added custom validation on save()

    * For entity AtrealOpenads:
       - Added permissions on each endpoint
       - Added 'email' field support for each type of 'demandeur'
       - 'numero_dossier' is now required in the url for some endpoints (previously was a GET param)
       - Added 'upload_user_files' as an endpoint (but can still be used as a method with request=None)
       - The 'upload_user_files()' method now only handle forward file that have status='pending'

Fixes:
    * Added "*args" and "**kwargs" arguments to some connector endpoint methods
    * Added 'verbose_name' and 'ordering' on each entity META
    * Added decorator '@force_encoded_string_output' to prevent utf-8 issues with python2
    * Added __repr__, __str__ and __unicode__ functions to each entities
    * Added database indexes for each entity
    * Commented out useless JSON schema imports
    * Removed unused variable assignations

Refactoring:
    * Moved utilities functions to utils.py file
    * Added a BaseModel to provide some default functions for Models
    * Added enum/translations of hard-coded values for ForwardFile 'upload_status' field

Tests:
    * Added test for each entity
    * Added tests files specific to utilities, forms and views
    * Total code coverage is 99% with only 10 statement missed
This commit is contained in:
Michael Bideau 2019-08-20 17:00:12 +00:00
parent e8de2e66e8
commit 990f1fb7bc
20 changed files with 3033 additions and 370 deletions

67
atreal_openads/forms.py Normal file
View File

@ -0,0 +1,67 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from django.forms import ModelForm
from .models import ForwardFile, Collectivite, Guichet
class ForwardFileForm(ModelForm):
class Meta:
model = ForwardFile
exclude = ['connecteur', 'size', 'file_hash']
def __init__(self, *args, **kwargs):
connecteur = kwargs.pop('connecteur' , None)
collectivite = kwargs.pop('collectivite', None)
super(ForwardFileForm, self).__init__(*args, **kwargs)
if (
(not hasattr(self.instance, 'connecteur') or not self.instance.connecteur)
and connecteur
):
self.instance.connecteur = connecteur
if (
(not hasattr(self.instance, 'collectivite') or not self.instance.collectivite)
and collectivite
):
self.instance.collectivite = collectivite
# only allow to select a 'collectivite' that belongs to the connecteur
if hasattr(self.instance, 'connecteur') and self.instance.connecteur:
self.fields['collectivite'].queryset = Collectivite.objects.filter(connecteur=self.instance.connecteur)
# TODO if the status is 'uploading' make everything read-only
class CollectiviteForm(ModelForm):
class Meta:
model = Collectivite
exclude = ['connecteur']
def __init__(self, *args, **kwargs):
connecteur = kwargs.pop('connecteur', None)
super(CollectiviteForm, self).__init__(*args, **kwargs)
if (
(not hasattr(self.instance, 'connecteur') or not self.instance.connecteur)
and connecteur
):
self.instance.connecteur = connecteur
class GuichetForm(ModelForm):
class Meta:
model = Guichet
exclude = ['collectivite']
def __init__(self, *args, **kwargs):
collectivite = kwargs.pop('collectivite', None)
super(GuichetForm, self).__init__(*args, **kwargs)
if (
(not hasattr(self.instance, 'collectivite') or not self.instance.collectivite)
and collectivite
):
self.instance.collectivite = collectivite

View File

@ -1,9 +1,10 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.11.15 on 2019-07-18 20:51
# Generated by Django 1.11.18 on 2019-08-20 15:02
from __future__ import unicode_literals
import atreal_openads.models
import atreal_openads.utils
from django.db import migrations, models
import django.db.models.deletion
class Migration(migrations.Migration):
@ -28,13 +29,30 @@ class Migration(migrations.Migration):
('trusted_certificate_authorities', models.FileField(blank=True, null=True, upload_to=b'', verbose_name='TLS trusted CAs')),
('verify_cert', models.BooleanField(default=True, verbose_name='TLS verify certificates')),
('http_proxy', models.CharField(blank=True, max_length=128, verbose_name='HTTP and HTTPS proxy')),
('collectivite', models.CharField(blank=True, default=b'', help_text='ex: Marseille, or ex: 3', max_length=255, verbose_name='Collectivity (identifier)')),
('default_collectivite_openADS_id', models.PositiveIntegerField(blank=True, default=0, help_text='ex: 3', verbose_name="Default 'collectivite' (identifier in openADS)")),
('openADS_API_url', models.URLField(default=b'', help_text='ex: https://openads.your_domain.net/api/', max_length=255, verbose_name='openADS API URL')),
('users', models.ManyToManyField(blank=True, related_name='_atrealopenads_users_+', related_query_name='+', to='base.ApiUser')),
],
options={
'ordering': ['openADS_API_url'],
'verbose_name': 'openADS',
'verbose_name_plural': 'openADS',
},
bases=(models.Model, atreal_openads.utils.BaseModel),
),
migrations.CreateModel(
name='Collectivite',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('name', models.CharField(blank=True, default=b'', max_length=150)),
('openADS_id', models.PositiveIntegerField(help_text='ex: 3', verbose_name='openADS identifier')),
('connecteur', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='collectivites', related_query_name='collectivite', to='atreal_openads.AtrealOpenads')),
],
options={
'ordering': ['name'],
'verbose_name': 'Collectivite',
},
bases=(models.Model, atreal_openads.utils.BaseModel),
),
migrations.CreateModel(
name='ForwardFile',
@ -46,11 +64,90 @@ class Migration(migrations.Migration):
('file_hash', models.CharField(blank=True, default=b'', max_length=100)),
('orig_filename', models.CharField(blank=True, default=b'', max_length=100)),
('content_type', models.CharField(blank=True, default=b'', max_length=100)),
('upload_file', models.FileField(null=True, upload_to=atreal_openads.models.get_upload_path)),
('size', models.PositiveIntegerField(default=0)),
('upload_file', models.FileField(blank=True, null=True, upload_to=atreal_openads.utils.get_upload_path)),
('upload_attempt', models.PositiveIntegerField(blank=True, default=0)),
('upload_status', models.CharField(blank=True, default=b'', max_length=10)),
('upload_status', models.CharField(choices=[(b'pending', 'Pending'), (b'uploading', 'Uploading'), (b'failed', 'Failed'), (b'success', 'Success')], default=b'pending', max_length=10)),
('upload_msg', models.CharField(blank=True, default=b'', max_length=255)),
('last_update_datetime', models.DateTimeField(auto_now=True)),
('collectivite', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, related_name='forward_files', related_query_name='forward_file', to='atreal_openads.Collectivite')),
('connecteur', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='forward_files', related_query_name='forward_file', to='atreal_openads.AtrealOpenads')),
],
options={
'ordering': ['-last_update_datetime'],
'verbose_name': 'Forward File',
},
bases=(models.Model, atreal_openads.utils.BaseModel),
),
migrations.CreateModel(
name='Guichet',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('ouverture_jour_h', models.TimeField(help_text='ex: 08:30', verbose_name='Hour of opening (each day)')),
('fermeture_jour_h', models.TimeField(help_text='ex: 17:00', verbose_name='Hour of closing (each day)')),
('ouverture_sem_d', models.PositiveIntegerField(choices=[(1, 'Monday'), (2, 'Tuesday'), (3, 'Wednesday'), (4, 'Thursday'), (5, 'Friday'), (6, 'Saturday'), (7, 'Sunday')], default=1, help_text='ex: Lundi', verbose_name='Day of opening (each week)')),
('fermeture_sem_d', models.PositiveIntegerField(choices=[(1, 'Monday'), (2, 'Tuesday'), (3, 'Wednesday'), (4, 'Thursday'), (5, 'Friday'), (6, 'Saturday'), (7, 'Sunday')], default=6, help_text='ex: Samedi', verbose_name='Day of closing (each week)')),
('ouverture_sem_h', models.TimeField(help_text='ex: 08:30', verbose_name='Hour of opening (each week)')),
('fermeture_sem_h', models.TimeField(help_text='ex: 12:15', verbose_name='Hour of closing (each week)')),
('collectivite', models.OneToOneField(on_delete=django.db.models.deletion.CASCADE, related_name='guichet', to='atreal_openads.Collectivite')),
],
options={
'ordering': ['collectivite'],
'verbose_name': 'Guichet',
'verbose_name_plural': 'Guichets Urbanisme',
},
bases=(models.Model, atreal_openads.utils.BaseModel),
),
migrations.AddIndex(
model_name='guichet',
index=models.Index(fields=[b'collectivite'], name=b'su_collectivite_idx'),
),
migrations.AddIndex(
model_name='forwardfile',
index=models.Index(fields=[b'connecteur'], name=b'ff_connecteur_idx'),
),
migrations.AddIndex(
model_name='forwardfile',
index=models.Index(fields=[b'collectivite'], name=b'ff_collectivite_idx'),
),
migrations.AddIndex(
model_name='forwardfile',
index=models.Index(fields=[b'numero_demande', b'numero_dossier'], name=b'ff_deman_doss_idx'),
),
migrations.AddIndex(
model_name='forwardfile',
index=models.Index(fields=[b'numero_demande'], name=b'ff_demande_idx'),
),
migrations.AddIndex(
model_name='forwardfile',
index=models.Index(fields=[b'numero_dossier'], name=b'ff_dossier_idx'),
),
migrations.AddIndex(
model_name='forwardfile',
index=models.Index(fields=[b'orig_filename'], name=b'ff_filename_idx'),
),
migrations.AddIndex(
model_name='forwardfile',
index=models.Index(fields=[b'upload_status'], name=b'ff_status_idx'),
),
migrations.AddIndex(
model_name='forwardfile',
index=models.Index(fields=[b'last_update_datetime'], name=b'ff_last_up_dt_idx'),
),
migrations.AddIndex(
model_name='collectivite',
index=models.Index(fields=[b'connecteur', b'openADS_id'], name=b'col_conn_openADSid_idx'),
),
migrations.AddIndex(
model_name='collectivite',
index=models.Index(fields=[b'connecteur'], name=b'col_connecteur_idx'),
),
migrations.AddIndex(
model_name='collectivite',
index=models.Index(fields=[b'openADS_id'], name=b'col_openADS_id_idx'),
),
migrations.AlterUniqueTogether(
name='collectivite',
unique_together=set([('connecteur', 'openADS_id')]),
),
]

View File

@ -24,161 +24,374 @@ import datetime
import os
import re
import magic
import hashlib
import copy
from HTMLParser import HTMLParser
from django.db import models
from django.http import Http404
from django.utils.translation import ugettext_lazy as _
from django.core.files import File
from django.core.files.base import ContentFile
from django.core.exceptions import ValidationError
from passerelle.base.models import BaseResource, HTTPResource
from passerelle.utils.api import endpoint
from passerelle.utils.jsonresponse import APIError
from .json_schemas import (
JSON_SCHEMA_CHECK_STATUS_OUT,
JSON_SCHEMA_CREATE_DOSSIER_IN,
JSON_SCHEMA_CREATE_DOSSIER_OUT,
JSON_SCHEMA_GET_DOSSIER_OUT,
JSON_SCHEMA_GET_FWD_FILES_OUT,
JSON_SCHEMA_GET_FWD_FILES_STATUS_OUT,
JSON_SCHEMA_GET_COURRIER_OUT
#JSON_SCHEMA_CHECK_STATUS_OUT,
#JSON_SCHEMA_CREATE_DOSSIER_OUT,
#JSON_SCHEMA_GET_DOSSIER_OUT,
#JSON_SCHEMA_GET_FWD_FILES_OUT,
#JSON_SCHEMA_GET_FWD_FILES_STATUS_OUT,
#JSON_SCHEMA_GET_COURRIER_OUT
)
from .utils import (
force_encoded_string_output,
strip_tags,
clean_spaces,
normalize,
get_file_digest,
get_upload_path,
get_file_extension,
trunc_str_values,
DictDumper,
BaseModel
)
class MLStripper(HTMLParser):
"""HTML parser that removes html tags."""
def __init__(self):
self.reset()
self.fed = []
def handle_data(self, d):
self.fed.append(d)
def get_data(self):
return ''.join(self.fed)
def strip_tags(html):
"""Remove html tags from a string."""
s = MLStripper()
s.feed(html)
return s.get_data()
def clean_spaces(text):
"""Remove extra spaces an line breaks from a string."""
text = text.replace('\n', ' ')
text = text.replace('\r', ' ')
text = text.replace('\t', ' ')
text = text.replace('\\n', ' ')
text = text.replace('\\r', ' ')
text = text.replace('\\t', ' ')
return re.sub(r' +', ' ', text).strip()
def normalize(value):
"""Normalize a value to be send to openADS.API."""
if value is None:
return ''
if not isinstance(value, unicode):
value = unicode(value)
return clean_spaces(value)
def get_file_data(path, b64=True):
"""Return the content of a file as a string, in base64 if specified."""
with open(path, 'r') as f:
if b64:
return base64.b64encode(f.read())
return f.read()
def get_upload_path(instance, filename=None):
"""Return a relative upload path for a file."""
# be careful:
# * openADS accept only filename less than 50 chars
# * name should be unique, even if the content is the same
return 'pass_openADS_up_%s_%s' % (
datetime.datetime.now().strftime('%Y-%b-%d_%Hh%Mm%Ss%f'),
instance.file_hash[:4]
)
def trunc_str_values(value, limit, visited=None, truncate_text=u''):
"""Truncate a string value (not dict keys) and append a truncate text."""
if visited is None:
visited = []
if not value in visited:
if isinstance(value, basestring) and len(value) > limit:
value = value[:limit] + truncate_text
elif isinstance(value, dict) or isinstance(value, list) or isinstance(value, tuple):
visited.append(value)
iterator = value.iteritems() if isinstance(value, dict) else enumerate(value)
for k,v in iterator:
value[k] = trunc_str_values(v, limit, visited, truncate_text)
return value
class DictDumper(object):
"""Helper to dump a dictionary to a string representation with lazy processing.
Only applied when dict is converted to string (lazy processing):
- long strings truncated (after the dict has been 'deep' copied)
- (optionaly) dict converted with json.dumps instead of unicode().
"""
def __init__(self, dic, max_str_len=255, use_json_dumps=True):
""" arguments:
- dic string the dict to dump
- max_str_len integer the maximul length of string values
- use_json_dumps boolean True to use json.dumps() else it uses unicode()
"""
self.dic = dic
self.max_str_len = max_str_len
self.use_json_dumps = use_json_dumps
def __str__(self):
dict_trunc = trunc_str_values(copy.deepcopy(self.dic), self.max_str_len)
dict_ref = json.dumps(dict_trunc) if self.use_json_dumps else dict_trunc
return unicode(dict_ref)
class ForwardFile(models.Model):
class ForwardFile(models.Model, BaseModel):
"""Represent a file uploaded by a user, to be forwarded to openADS.API."""
STATUSES = [
('pending' , _('Pending')),
('uploading', _('Uploading')),
('failed' , _('Failed')),
('success' , _('Success'))
]
connecteur = models.ForeignKey('AtrealOpenads',
on_delete=models.CASCADE,
related_name="forward_files",
related_query_name="forward_file")
collectivite = models.ForeignKey('Collectivite', blank=True, null=True,
on_delete=models.CASCADE,
related_name="forward_files",
related_query_name="forward_file")
numero_demande = models.CharField(max_length=20)
numero_dossier = models.CharField(max_length=20)
type_fichier = models.CharField(max_length=10)
file_hash = models.CharField(max_length=100, default='', blank=True)
orig_filename = models.CharField(max_length=100, default='', blank=True)
content_type = models.CharField(max_length=100, default='', blank=True)
upload_file = models.FileField(upload_to=get_upload_path, null=True)
size = models.PositiveIntegerField(default=0)
upload_file = models.FileField(upload_to=get_upload_path, blank=True, null=True)
upload_attempt = models.PositiveIntegerField(default=0, blank=True)
upload_status = models.CharField(max_length=10, default='', blank=True)
upload_status = models.CharField(max_length=10, choices=STATUSES, default='pending')
upload_msg = models.CharField(max_length=255, default='', blank=True)
last_update_datetime = models.DateTimeField(auto_now=True)
class Meta:
verbose_name = _('Forward File')
indexes = [
models.Index(fields=['connecteur'] , name='ff_connecteur_idx'),
models.Index(fields=['collectivite'] , name='ff_collectivite_idx'),
models.Index(fields=['numero_demande', 'numero_dossier'], name='ff_deman_doss_idx'),
models.Index(fields=['numero_demande'], name='ff_demande_idx'),
models.Index(fields=['numero_dossier'], name='ff_dossier_idx'),
models.Index(fields=['orig_filename'] , name='ff_filename_idx'),
models.Index(fields=['upload_status'] , name='ff_status_idx'),
models.Index(fields=['last_update_datetime'], name='ff_last_up_dt_idx')
]
ordering = ['-last_update_datetime']
class AtrealOpenads(BaseResource, HTTPResource):
def get_status(self, status_codename=None):
"""Return the upload status human name translated.
If specified codename is not found, return it.
"""
if not status_codename:
status_codename = self.upload_status
for st in self.STATUSES:
if st[0] == status_codename:
return st[1]
return status_codename
@force_encoded_string_output
def __repr__(self):
return u'ForwardFile(id=%s,connecteur=%s,collectivite=%s,demande=%s,dossier=%s,type=%s,filename=%s,status=%s)' % (
self.id,
unicode(self.connecteur) if hasattr(self, 'connecteur') else None,
unicode(self.collectivite) if hasattr(self, 'collectivite') else None,
self.numero_demande, self.numero_dossier,
self.type_fichier, self.orig_filename, self.upload_status)
def __unicode__(self):
return u"%s[%s]" % (trunc_str_values(self.orig_filename, 20), self.get_status())
def get_url_params(self, *args, **kwargs):
params = super(ForwardFile, self).get_url_params(*args, **kwargs)
params['connecteur'] = self.connecteur.slug if self.connecteur else None
return params
def update_content_type(self, only_if_empty=False):
"""Update the content type from the content of the file."""
if not self.content_type or not only_if_empty:
if self.upload_file and self.upload_file.size:
self.content_type = magic.from_buffer(self.upload_file.read(1024), mime=True)
else:
self.content_type = ''
def update_file_hash(self, only_if_empty=False):
"""Update the file_hash field from the content of the file."""
if not self.file_hash or not only_if_empty:
if self.upload_file and self.upload_file.size:
self.file_hash = get_file_digest(self.upload_file)
else:
self.file_hash = ''
# preprocessing data and validate model before saving
# /!\ Attention: this will not be triggered when doing bulk actions like with QuerySet.update()
# @see: https://docs.djangoproject.com/en/2.2/topics/db/models/#overriding-predefined-model-methods
# The note entitled "Overridden model methods are not called on bulk operations"
def save(self, *args, **kwargs):
# delete file content (on success)
if self.upload_status == 'success':
if self.upload_file and self.upload_file.size > 0:
self.upload_file.delete()
# else, update metadata
else:
self.size = self.upload_file.size if self.upload_file else 0
self.update_file_hash()
self.update_content_type(only_if_empty=True)
# validation (calling self.clean())
self.full_clean()
super(ForwardFile, self).save(*args, **kwargs)
# check that one the following fields must not be blank/null:
# 'file_hash', 'orig_filename', 'upload_file'
# because if they are all empty we dont have any usefull information about the upload
def clean(self, *args, **kwargs):
ret = super(ForwardFile, self).clean(*args, **kwargs)
if (not self.file_hash
and not self.orig_filename
and (not self.upload_file or not self.upload_file.size)
):
raise ValidationError(
_("A %s cannot have all the following fields empty: %s." % (
self.get_verbose_name(),
['file_hash', 'orig_filename', 'upload_file'])
)
)
return ret
class Collectivite(models.Model, BaseModel):
"""Represent a "collectivite"."""
name = models.CharField(max_length=150, default='', blank=True)
connecteur = models.ForeignKey('AtrealOpenads',
on_delete=models.CASCADE,
related_name="collectivites",
related_query_name="collectivite")
openADS_id = models.PositiveIntegerField(_('openADS identifier'), help_text=_('ex: 3'))
# 'guichet' will be a property provided by the one-to-one relation of Guichet
# 'forward_files' will be a property provided by the related_name of the foreignKey
class Meta:
verbose_name = _('Collectivite')
unique_together = ['connecteur', 'openADS_id']
indexes = [
models.Index(fields=['connecteur', 'openADS_id'], name='col_conn_openADSid_idx'),
models.Index(fields=['connecteur'], name='col_connecteur_idx'),
models.Index(fields=['openADS_id'], name='col_openADS_id_idx')
]
ordering = ['name']
@classmethod
def get_fields(cls, *args, **kwargs):
# get_fields() return is immutable, hence the copy
fields = [f for f in super(Collectivite, cls).get_fields(*args, **kwargs)]
# moving related fields field at the end of the list
if fields:
rels = []
for rel_name in ['forward_file', 'guichet']:
if (fields[0]
and hasattr(fields[0], 'name')
and fields[0].name == rel_name
):
rels.append(fields.pop(0))
for rel in reversed(rels):
fields.append(rel)
return fields
@force_encoded_string_output
def __repr__(self):
return u'Collectivite(id=%s,name=%s,connecteur=%s,openADS_id=%s,guichet=%s)' % (
self.id, unicode(self.name),
unicode(self.connecteur) if hasattr(self, 'connecteur') else None,
self.openADS_id,
unicode(self.guichet) if hasattr(self, 'guichet') else None)
def __unicode__(self):
return self.name if isinstance(self.name, unicode) else unicode(self.name)
def get_fields_kv(self, *args, **kwargs):
fields = super(Collectivite, self).get_fields_kv(*args, **kwargs)
# moving related fields field at the end of the list
if fields:
rels = []
for rel_name in ['forward_file', 'guichet']:
if (fields[0] and fields[0][0]
and hasattr(fields[0][0], 'name')
and fields[0][0].name == rel_name
):
rels.append(fields.pop(0))
for rel in reversed(rels):
fields.append(rel)
return fields
def get_url_params(self, *args, **kwargs):
params = super(Collectivite, self).get_url_params(*args, **kwargs)
params['connecteur'] = self.connecteur.slug if self.connecteur else None
return params
class Guichet(models.Model, BaseModel):
"""Represent a "Guichet"."""
DAYS = [
(1, _('Monday')),
(2, _('Tuesday')),
(3, _('Wednesday')),
(4, _('Thursday')),
(5, _('Friday')),
(6, _('Saturday')),
(7, _('Sunday'))
]
collectivite = models.OneToOneField('Collectivite',
on_delete=models.CASCADE,
related_name="guichet")
ouverture_jour_h = models.TimeField(_('Hour of opening (each day)'), help_text=_('ex: 08:30'))
fermeture_jour_h = models.TimeField(_('Hour of closing (each day)'), help_text=_('ex: 17:00'))
ouverture_sem_d = models.PositiveIntegerField(_('Day of opening (each week)'), help_text=_('ex: Lundi'), choices=DAYS, default=1)
fermeture_sem_d = models.PositiveIntegerField(_('Day of closing (each week)'), help_text=_('ex: Samedi'), choices=DAYS, default=6)
ouverture_sem_h = models.TimeField(_('Hour of opening (each week)'), help_text=_('ex: 08:30'))
fermeture_sem_h = models.TimeField(_('Hour of closing (each week)'), help_text=_('ex: 12:15'))
class Meta:
verbose_name = _('Guichet')
verbose_name_plural = _('Guichets Urbanisme')
indexes = [
models.Index(fields=['collectivite'], name='su_collectivite_idx')
]
ordering = ['collectivite']
@force_encoded_string_output
def __repr__(self):
return u'Guichet(id=%s,collectivite=%s,%s)' % (
self.id,
unicode(self.collectivite) if hasattr(self, 'collectivite') else None,
unicode(self))
def __unicode__(self):
return u'%s %s -> %s %s [%s/%s]' % (
unicode(self.DAYS[self.ouverture_sem_d - 1][1]),
self.ouverture_sem_h.strftime('%H:%M') if self.ouverture_sem_h else None,
unicode(self.DAYS[self.fermeture_sem_d - 1][1]),
self.fermeture_sem_h.strftime('%H:%M') if self.fermeture_sem_h else None,
self.ouverture_jour_h.strftime('%H:%M') if self.ouverture_jour_h else None,
self.fermeture_jour_h.strftime('%H:%M') if self.fermeture_jour_h else None)
def get_url_params(self, *args, **kwargs):
params = super(Guichet, self).get_url_params(*args, **kwargs)
params['collectivite'] = self.collectivite.id if self.collectivite else None
params['connecteur'] = self.collectivite.connecteur.slug if self.collectivite else None
return params
def get_list_url(self):
raise Exception(u"Guichet:get_list_url() method should not be called")
# @raise TypeError if argument is not a datetime object
def is_open(self, dt):
""" Return 'True' if the "Guichet" is open, else False."""
if dt:
if not isinstance(dt, datetime.datetime):
raise TypeError(u"is_open() expect a datetime object (not a %s)" % type(dt))
ouverture_jour_dt = datetime.datetime.combine(dt, self.ouverture_jour_h)
fermeture_jour_dt = datetime.datetime.combine(dt, self.fermeture_jour_h)
day = dt.isoweekday()
return (
# opening day
(day == self.ouverture_sem_d
and dt.time() > self.ouverture_sem_h and dt < fermeture_jour_dt)
# closing day
or (day == self.fermeture_sem_d
and dt.time() < self.fermeture_sem_h and dt > ouverture_jour_dt)
# regular days
or ( day > self.ouverture_sem_d
and day < self.fermeture_sem_d
and dt > ouverture_jour_dt
and dt < fermeture_jour_dt
)
)
return False
class AtrealOpenads(BaseResource, HTTPResource, BaseModel):
"""API that proxy/relay communications with/to openADS."""
collectivite = models.CharField(_('Collectivity (identifier)'), max_length=255,
help_text=_('ex: Marseille, or ex: 3'), default='', blank=True)
default_collectivite_openADS_id = models.PositiveIntegerField(_("Default 'collectivite' (identifier in openADS)"),
help_text=_('ex: 3'), default=0, blank=True)
openADS_API_url = models.URLField(_('openADS API URL'), max_length=255,
help_text=_('ex: https://openads.your_domain.net/api/'), default='')
openADS_API_timeout = 3600
category = _('Business Process Connectors')
# 'collectivites' will be a property provided by the related_name of the foreignKey
# 'forward_files' will be a property provided by the related_name of the foreignKey
api_description = _('''This API provides exchanges with openADS.''')
category = _('Business Process Connectors')
class Meta:
verbose_name = _('openADS')
verbose_name_plural = _('openADS')
ordering = ['openADS_API_url']
@classmethod
def get_class_name_plural(cls, *args, **kwargs):
return cls.get_class_name(*args, **kwargs)
@force_encoded_string_output
def __repr__(self):
return u'AtrealOpenads(id=%s,openADS=%s,login=%s,collectivites=%s,default=%s)' % (
self.id,
unicode(self.openADS_API_url),
unicode(self.basic_auth_username),
self.collectivites.count(),
self.default_collectivite_openADS_id)
def __unicode__(self):
return self.slug if isinstance(self.slug, unicode) else unicode(self.slug)
def get_url_name(self, prefix=''):
return '%s%s' % (prefix + '-' if prefix else '', 'connector')
def get_url_params(self, primary_key=True):
params = {'connector': 'atreal-openads'}
if primary_key:
params['slug'] = self.slug
return params
def get_list_url(self):
raise Exception(u"AtrealOpenads:get_list_url() method should not be called")
def get_collectivite(self, openADS_id):
"""Return the 'collectivite' matching an openADS id."""
return Collectivite.objects.get(connecteur=self,openADS_id=openADS_id)
def log_json_payload(self, payload, title='payload', max_str_len=100):
"""Log a json paylod surrounded by dashes and with file content filtered."""
@ -186,7 +399,6 @@ class AtrealOpenads(BaseResource, HTTPResource):
self.logger.debug(u"%s", DictDumper(payload, max_str_len))
self.logger.debug(u"----- %s (end) -----", title)
def get_files_from_json_payload(self, payload, title='payload'):
"""Return files from a JSON payload with all checks and logging."""
@ -215,7 +427,6 @@ class AtrealOpenads(BaseResource, HTTPResource):
# return the files
return files
def check_file_dict(self, dict_file, title='payload', b64=True):
"""Ensure a file dict has all its required items."""
@ -261,7 +472,6 @@ class AtrealOpenads(BaseResource, HTTPResource):
# return the first file
return first
@endpoint(
description=_("Test an openADS 'connexion'")
#~ get={
@ -273,7 +483,7 @@ class AtrealOpenads(BaseResource, HTTPResource):
#~ }
#~ }
)
def check_status(self, request=None):
def check_status(self, request=None, *args, **kwargs):
"""Check avaibility of the openADS.API service."""
url = urlparse.urljoin(self.openADS_API_url, '__api__')
response = self.requests.get(url)
@ -282,6 +492,7 @@ class AtrealOpenads(BaseResource, HTTPResource):
@endpoint(
perm='can_access',
methods=['post'],
pattern='^(?P<type_dossier>\w+)/?$',
example_pattern='{type_dossier}/',
@ -290,14 +501,15 @@ class AtrealOpenads(BaseResource, HTTPResource):
'collectivite': {
'description': _("Use this collectivite (instead of the default one)"),
'example_value': '3'
}
},
'now': {'description': _("Datetime (or string formatted to: '%Y-%m-%d %H:%M:%S') against which the 'guichet' is checked for opening"), 'example_value': 'DIA'},
},
post={'description': _("Create an openADS 'dossier'"),
'request_body': {
'schema': {
'application/json': JSON_SCHEMA_CREATE_DOSSIER_IN
}
},
}
#~ 'response_body': {
#~ 'schema': {
#~ 'application/json': JSON_SCHEMA_CREATE_DOSSIER_OUT
@ -305,7 +517,7 @@ class AtrealOpenads(BaseResource, HTTPResource):
#~ }
}
)
def create_dossier(self, request, type_dossier, collectivite=None):
def create_dossier(self, request, type_dossier, collectivite=None, now=None, *args, **kwargs):
# loads the request body as JSON content
json_data = json.loads(request.body)
@ -313,8 +525,43 @@ class AtrealOpenads(BaseResource, HTTPResource):
# log the request body (filtering the files content)
self.log_json_payload(json_data, 'request')
# get the collectivite ID or use the connecteur's default one
collectivite_id = collectivite if collectivite else self.default_collectivite_openADS_id
# get the collectivite instance
try:
collectivite = self.get_collectivite(collectivite_id)
# no collectivite instance matching that ID
except Collectivite.DoesNotExist:
pass
# a collectivite instance was found
else:
# the collectivite has a guichet
if (hasattr(collectivite, 'guichet') and collectivite.guichet):
# get the datetime against which the 'guichet' is checked for opening
now_fmt = '%Y-%m-%d %H:%M:%S'
if not now:
now = datetime.datetime.now()
elif isinstance(now, basestring):
now = datetime.datetime.strptime(now, now_fmt)
elif not isinstance(now, datetime.datetime):
raise APIError(
u"Invalid value of type '%s' for now argument of endpoint '%s' "
"(must be: %s)" % (
type(now),
'create_dossier',
"datetime or string formatted to '%s'" % now_fmt))
# if the guichet is not open
if not collectivite.guichet.is_open(now):
return {'message': _(u"Guichet closed for collectivite '%s'" % collectivite)}
# build the payload
payload = { "collectivite": int(collectivite) if collectivite else int(self.collectivite) }
payload = { "collectivite": int(collectivite_id) }
payload["terrain"] = {
"numero_voie": normalize(json_data['fields']['terrain_numero_voie']),
@ -362,6 +609,9 @@ class AtrealOpenads(BaseResource, HTTPResource):
"nom_voie" : normalize(json_data['fields']['%snom_voie' % prefix]),
"code_postal": normalize(json_data['fields']['%scode_postal' % prefix]),
"localite" : normalize(json_data['fields']['%slocalite' % prefix])
},
"coordonnees": {
"email": normalize(json_data['fields']['%semail' % prefix])
}
}
@ -404,11 +654,8 @@ class AtrealOpenads(BaseResource, HTTPResource):
# set it as an upload
upload_file = ContentFile(content)
# build a hash from the upload
file_hash = self.file_digest(upload_file)
# build a filename (less than 50 chars)
filename = file_hash[45:] + '.pdf'
# get the file hash
file_hash = get_file_digest(upload_file)
# get the content type if specified
if 'content_type' in json_data['fields'][k]:
@ -419,9 +666,23 @@ class AtrealOpenads(BaseResource, HTTPResource):
self.logger.warning("CERFA content type is '%s' instead of '%s'", content_type, 'application/pdf')
# get the filename if specified
filename = None
if 'filename' in json_data['fields'][k]:
filename = json_data['fields'][k]['filename']
# define the file extension
file_extension = get_file_extension(filename, content_type)
# filename not specified
if not filename:
# build a filename (less than 50 chars)
filename = file_hash[40:] + file_extension
# update the specified filename with an extension, if none
elif '.' not in filename:
filename += file_extension
# set the type fichier based on the key (less than 10 chars)
type_fichier = re.sub(r'_.*$', '', k)[:10]
@ -477,7 +738,7 @@ class AtrealOpenads(BaseResource, HTTPResource):
# decode the recepisse from base 64
try:
recepisse_content = base64.b64decode(recepisse['b64_content'])
base64.b64decode(recepisse['b64_content'])
except TypeError:
raise APIError('Failed to decode recepisse content from base 64')
self.logger.debug("Successfully decoded recepisse from base 64")
@ -502,6 +763,9 @@ class AtrealOpenads(BaseResource, HTTPResource):
for f in files:
rand_id = base64.urlsafe_b64encode(os.urandom(6))
FF = ForwardFile()
FF.connecteur = self
if isinstance(collectivite, Collectivite):
FF.collectivite = collectivite
FF.numero_demande = rand_id
FF.numero_dossier = numero_dossier
for k in ['type_fichier', 'orig_filename', 'content_type', 'file_hash']:
@ -519,6 +783,7 @@ class AtrealOpenads(BaseResource, HTTPResource):
job = self.add_job('upload_user_files',
natural_id=numero_dossier,
request=None,
type_dossier=type_dossier,
numero_dossier=numero_dossier,
file_ids=file_ids)
@ -536,11 +801,11 @@ class AtrealOpenads(BaseResource, HTTPResource):
'recepisse' : recepisse
}
@endpoint(
perm='can_access',
description=_("Get informations about an openADS 'dossier'"),
pattern='^(?P<type_dossier>\w+)/?$',
example_pattern='{type_dossier}/',
pattern='^(?P<type_dossier>\w+)/(?P<numero_dossier>\w+)/?$',
example_pattern='{type_dossier}/{numero_dossier}',
parameters={
'type_dossier' : {'description': _("Type of 'dossier'") , 'example_value': 'DIA'},
'numero_dossier': {'description': _("Identifier for 'dossier'"), 'example_value': 'DIA0130551900001'}
@ -554,7 +819,7 @@ class AtrealOpenads(BaseResource, HTTPResource):
#~ }
#~ }
)
def get_dossier(self, request, type_dossier, numero_dossier):
def get_dossier(self, request, type_dossier, numero_dossier, *args, **kwargs):
# make a request to openADS.API
url = urlparse.urljoin(self.openADS_API_url, '/dossier/%s/%s' % (type_dossier, numero_dossier))
@ -578,27 +843,11 @@ class AtrealOpenads(BaseResource, HTTPResource):
# return the response as-is
return response.json()
def upload2ForwardFile(self, path, numero_dossier, type_fichier):
"""Convert a file path to a ForwardFile."""
if path:
rand_id = base64.urlsafe_b64encode(os.urandom(6))
fwd_file = ForwardFile()
fwd_file.numero_demande = rand_id
fwd_file.numero_dossier = numero_dossier
fwd_file.type_fichier = type_fichier
fwd_file.orig_filename = os.path.basename(path)
fwd_file.content_type = magic.from_file(path, mime=True)
with open(path, 'r') as fp:
fwd_file.file_hash = self.file_digest(fp)
fwd_file.upload_file = File(open(path, 'r'))
fwd_file.upload_status = 'pending'
return fwd_file
return None
@endpoint(
perm='can_access',
description=_("Get informations about the forwarding of user files to openADS"),
pattern='^(?P<numero_dossier>\w+)/?$',
example_pattern='{numero_dossier}/',
parameters={
'numero_dossier': {'description': _("Identifier for 'dossier'"), 'example_value': 'DIA0130551900001'},
'fichier_id' : {'description': _("File identifier") , 'example_value': '78'}
@ -612,7 +861,7 @@ class AtrealOpenads(BaseResource, HTTPResource):
#~ }
#~ }
)
def get_fwd_files(self, request, numero_dossier, fichier_id=None):
def get_fwd_files(self, request, numero_dossier, fichier_id=None, *args, **kwargs):
payload = []
fwd_files = []
@ -651,9 +900,11 @@ class AtrealOpenads(BaseResource, HTTPResource):
# return the payload containing the list of files
return payload
@endpoint(
perm='can_access',
description=_("Get informations about the forwarding of a user file to openADS"),
pattern='^(?P<numero_dossier>\w+)/?$',
example_pattern='{numero_dossier}/',
parameters={
'numero_dossier': {'description': _("Identifier for 'dossier'"), 'example_value': 'DIA0130551900001'},
'fichier_id' : {'description': _("File identifier") , 'example_value': '78'}
@ -667,7 +918,7 @@ class AtrealOpenads(BaseResource, HTTPResource):
#~ }
#~ }
)
def get_fwd_files_status(self, request, numero_dossier, fichier_id=None):
def get_fwd_files_status(self, request, numero_dossier, fichier_id=None, *args, **kwargs):
# get all files matching 'numero_dossier' and 'fichier_id'
fwd_files = self.get_fwd_files(request, numero_dossier, fichier_id)
@ -695,11 +946,11 @@ class AtrealOpenads(BaseResource, HTTPResource):
# respond with the payload
return payload
@endpoint(
perm='can_access',
description= _("Get a 'courrier' from an openADS 'dossier'"),
pattern='^(?P<type_dossier>\w+)/?$',
example_pattern='{type_dossier}/',
pattern='^(?P<type_dossier>\w+)/(?P<numero_dossier>\w+)/(?P<lettre_type>\w+)/?$',
example_pattern='{type_dossier}/{numero_dossier}/{lettre_type}',
parameters={
'type_dossier' : {'description': _("Type of 'dossier'") , 'example_value': 'DIA'},
'numero_dossier': {'description': _("Identifier for 'dossier'"), 'example_value': 'DIA0130551900001'},
@ -714,7 +965,7 @@ class AtrealOpenads(BaseResource, HTTPResource):
#~ }
#~ }
)
def get_courrier(self, request, type_dossier, numero_dossier, lettre_type):
def get_courrier(self, request, type_dossier, numero_dossier, lettre_type, *args, **kwargs):
# make a request to openADS.API
url = urlparse.urljoin(
@ -742,14 +993,13 @@ class AtrealOpenads(BaseResource, HTTPResource):
# decode the courrier from base 64
try:
courrier_content = base64.b64decode(courrier['b64_content'])
base64.b64decode(courrier['b64_content'])
except TypeError:
raise APIError('Failed to decode courrier content from base 64')
# return the 'courrier' file
return {'courrier': courrier}
def get_response_error(self, response):
"""Return a error string from an HTTP response."""
try:
@ -780,44 +1030,94 @@ class AtrealOpenads(BaseResource, HTTPResource):
detail = clean_spaces(strip_tags(response.content[:1000])) if response.content else ''
return u"HTTP error: %s%s" % (response.status_code, ', ' + detail if detail else '')
@endpoint(
perm='can_access',
description= _("Trigger the uploading of user's files to openADS"),
pattern='^(?P<type_dossier>\w+)/(?P<numero_dossier>\w+)/?$',
example_pattern='{type_dossier}/{numero_dossier}',
parameters={
'type_dossier' : {'description': _("Type of 'dossier'") , 'example_value': 'DIA'},
'numero_dossier': {'description': _("Identifier for 'dossier'"), 'example_value': 'DIA0130551900001'},
'file_ids' : {'description': _("List of ForwardFile IDs to upload (coma separated)"), 'example_value': '12,18'}
},
#~ get={
#~ 'description': _("Trigger the uploading of user's files to openADS"),
#~ 'response_body': {
#~ 'schema': {
#~ 'application/json': JSON_SCHEMA_UPLOAD_USER_FILES
#~ }
#~ }
#~ }
)
# @raise ForwareFile.DoesNotExist if not found
def upload_user_files(self, type_dossier, numero_dossier, file_ids):
def upload_user_files(self, request, type_dossier, numero_dossier, file_ids=None, *args, **kwargs):
"""A Job to forward user uploaded files to openADS."""
payload = []
fwd_files = []
if file_ids:
# if file_ids is a string
if isinstance(file_ids, basestring):
file_ids = [int(fid) for fid in file_ids.split(',')]
# invalid input
elif not isinstance(file_ids, list):
raise TypeError(
"Invalid 'file_ids' argument type '%s' "
"(must be string or list)" % type(file_ids))
# a list of ForwardFile IDs was specified
if file_ids:
fwd_files = ForwardFile.objects.filter(id__in=file_ids).all()
# check that all ids where found
fwd_files_ids = set([ff.id for ff in fwd_files])
file_ids_diff = [item for item in file_ids if item not in fwd_files_ids]
if file_ids_diff:
raise ForwardFile.DoesNotExist(
"The following ForwardFile IDs were not found: %s." % file_ids_diff)
# filter out files not in status 'pending'
fwd_files_filtered = fwd_files.filter(upload_status='pending').all()
fwd_filtered_ids = set([ff.id for ff in fwd_files_filtered])
file_ids_diff = [item for item in file_ids if item not in fwd_filtered_ids]
if file_ids_diff:
self.logger.warning(
"The following ForwardFile IDs were not in status '%s' "
"when asked specificaly to upload them: %s." % ('pending', file_ids_diff))
fwd_files = fwd_files_filtered
# no files_ids where specified
else:
# process all ForwardFiles of the 'dossier' (in status 'pending')
fwd_files = ForwardFile.objects.filter(
numero_dossier=numero_dossier,
upload_status='pending'
).all()
# for every file ids specified (in parameters of this job)
for fid in file_ids:
self.logger.debug(u"upload_user_files() ForwardFile file_id: %s", fid)
for fwd_file in fwd_files:
self.logger.debug(u"upload_user_files() ForwardFile file_id: %s", fwd_file.id)
# get the matching forward file
fwd_file = ForwardFile.objects.get(id=fid)
# add the file content and data to the payload
payload.append({
'filename' : fwd_file.orig_filename + ('.pdf' if fwd_file.orig_filename[-4:] != '.pdf' else ''),
'content_type' : fwd_file.content_type,
'b64_content' : base64.b64encode(fwd_file.upload_file.read()),
'file_type' : fwd_file.type_fichier
})
self.logger.debug("upload_user_files() payload added")
# found one
if fwd_file:
self.logger.debug("upload_user_files() got ForwardFile")
# add the file content and data to the payload
payload.append({
'filename' : fwd_file.orig_filename + ('.pdf' if fwd_file.orig_filename[-4:] != '.pdf' else ''),
'content_type' : fwd_file.content_type,
'b64_content' : base64.b64encode(fwd_file.upload_file.read()),
'file_type' : fwd_file.type_fichier
})
self.logger.debug("upload_user_files() payload added")
# update the file upload data (status and attempts)
fwd_file.upload_status = 'uploading'
fwd_file.upload_attempt += 1
fwd_file.upload_msg = 'attempt %s' % fwd_file.upload_attempt
self.logger.debug(u"upload_user_files() upload_msg: '%s'", fwd_file.upload_msg)
fwd_file.save()
self.logger.debug("upload_user_files() ForwardFile saved")
# append the forwarded file to the list
fwd_files.append(fwd_file)
# update the file upload data (status and attempts)
fwd_file.upload_status = 'uploading'
fwd_file.upload_attempt += 1
fwd_file.upload_msg = 'attempt %s' % fwd_file.upload_attempt
self.logger.debug(u"upload_user_files() upload_msg: '%s'", fwd_file.upload_msg)
fwd_file.save()
self.logger.debug("upload_user_files() ForwardFile saved")
# if files need to be forwarded
if payload:
@ -837,6 +1137,8 @@ class AtrealOpenads(BaseResource, HTTPResource):
# reponse is an error
if response.status_code // 100 != 2:
error = self.get_response_error(response)
self.logger.warning(u"Request [POST] '%s' failed with error: '%s'", url, error)
# update every files status as 'failed' and save the error message
for fwd_file in fwd_files:
@ -852,12 +1154,16 @@ class AtrealOpenads(BaseResource, HTTPResource):
file_ids
)
# respond with APIError
if request:
raise APIError(error)
# response is not an error
else:
# load the reponse as JSON
try:
result = response.json()
response.json()
# in case of failure
except ValueError:
@ -875,6 +1181,10 @@ class AtrealOpenads(BaseResource, HTTPResource):
fwd_files
)
# respond with APIError
if request:
raise APIError(u'No JSON content returned: %r' % response.content[:1000])
# response correctly loaded as JSON
else:
@ -885,20 +1195,21 @@ class AtrealOpenads(BaseResource, HTTPResource):
fwd_file.upload_status = 'success'
fwd_file.upload_msg = 'uploaded successfuly'
# delete file content (on success)
# save the file (content will be deleted automatically)
fpath = fwd_file.upload_file.path
fwd_file.upload_file.delete()
# save the file
fwd_file.save()
# log the success message
self.logger.debug(
u"upload_user_files() flaging file '%s' has transfered (deleted '%s')",
u"upload_user_files() flaging file '%s' as 'transfered' (deleted '%s')",
fwd_file.id,
fpath
)
# respond with success
if request:
return {'message': 'all files transfered successfully'}
# no file need to be forwarded
else:
self.logger.warning(
@ -907,15 +1218,6 @@ class AtrealOpenads(BaseResource, HTTPResource):
file_ids
)
# copy-pasted from 'wcs/qommon/misc.py'
def file_digest(self, content, chunk_size=100000):
"""Return a hash for the content specified."""
digest = hashlib.sha256()
content.seek(0)
def read_chunk():
return content.read(chunk_size)
for chunk in iter(read_chunk, ''):
digest.update(chunk)
return digest.hexdigest()
# respond with message
if request:
return {'message': 'no file to transfer'}

View File

@ -0,0 +1,9 @@
/* overrides for atreal_openads */
div#header h1.breadcrumbs a:last-child {
display: inline-block;
}
.connecteur-collectivites .collectivite .service-urbanisme {
margin-left: 0.1em;
}

View File

@ -0,0 +1,33 @@
{% extends "passerelle/manage.html" %}
{% load i18n passerelle gadjo staticfiles %}
{% block extrascripts %}
{{ block.super }}
<link rel="stylesheet" type="text/css" media="all" href="{% static "css/atreal_openads.css" %}?{% start_timestamp %}"/>
{% endblock %}
{% block breadcrumb %}
{{ block.super }}
{% if connecteur %}<a href="{{connecteur.get_absolute_url}}">{{ connecteur.slug }}</a>{% endif %}
{% endblock %}
{% block appbar %}
<h2>{{ view.model.get_verbose_name }} - {% if object.id %}{{ object.connecteur }} - {{ object.name }}{% else %}{% trans 'New' %}{% endif %}</h2>
{% endblock %}
{% block content %}
<form method="post" enctype="multipart/form-data">
<div id="form-content">
{% csrf_token %}
{{ form|with_template }}
</div>
{% block buttons %}
<div class="buttons">
<button class="submit-button">{% trans "Save" %}</button>
<button class="cancel">{% trans "Cancel" %}</button>
</div>
{% endblock %}
</form>
{% endblock %}

View File

@ -0,0 +1,99 @@
{% extends "passerelle/manage.html" %}
{% load i18n passerelle gadjo staticfiles %}
{% block extrascripts %}
{{ block.super }}
<link rel="stylesheet" type="text/css" media="all" href="{% static "css/atreal_openads.css" %}?{% start_timestamp %}"/>
{% endblock %}
{% block breadcrumb %}
{{ block.super }}
{% if connecteur %}<a href="{{connecteur.get_absolute_url}}">{{ connecteur.slug }}</a>{% endif %}
{% endblock %}
{% block appbar %}
<h2>{{ view.model.get_verbose_name_plural }}</h2>
<span class="actions">
{% if collectivite_add_url %}
<a rel="popup" href="{{ collectivite_add_url }}?back-to=list-collectivites">{% trans "Add a Collectivite" %}</a>
{% endif %}
</span>
{% endblock %}
{% block content %}
<table id="list-collectivite">
{% block list %}
{% if object_list %}
<thead>
<tr>
<th class="actions">{% trans "Actions" %}</th>
{% for field in view.model.get_fields %}
{% if field.name != "connecteur" %}
<th class="{{field.name}}">
{% if field.name == "guichet" %}{% trans "Guichet" %}
{% elif field.name == "forward_file" %}{% trans "Forward Files" %}
{% else %}{% trans field.verbose_name %}
{% endif %}
</th>
{% endif %}
{% endfor %}
</tr>
</thead>
<tbody>
{% for col in object_list %}
<tr>
<td class="actions">
<span class="edit-link" ><a rel="popup" href="{{col.get_edit_url}}?back-to=list-collectivites" >{% trans "edit" %}</a></span>
<span class="delete-link"><a rel="popup" href="{{col.get_delete_url}}?back-to=list-collectivites">{% trans "delete" %}</a></span>
</td>
{% for field, value in col.get_fields_kv %}
{% if field.name != "connecteur" %}
<td class="{{field.name}}">
{% if field.name != "guichet" and field.name != "name" and field.name != "forward_file" %}
{{value}}
{% elif field.name == "name" %}
<a href="{{col.get_absolute_url}}">{{value}}</a>
{% elif field.name == "guichet" %}
{% if value %}<a href="{{value.get_absolute_url}}">{{value}}</a>{% else %}{% trans "None" %}{% endif %}
{% elif field.name == "forward_file" %}
{% if value %}<a href="{{ col.get_absolute_url }}/list-forward-files">{{ value.count }} {% trans "forward file(s)" %}</a>{% else %}{% trans "None" %}{% endif %}
{% endif %}
</td>
{% endif %}
{% endfor %}
</tr>
{% endfor %}
</tbody>
{% if is_paginated %}
<tfoot>
<tr>
<td class="pagination" colspan="{{ view.model.get_fields|length }}">
<span class="step-links">
{% if page_obj.has_previous %}
<a href="?page=1">&laquo; {% trans "first" %}</a>
<a href="?page={{ contacts.previous_page_number }}">{% trans "previous" %}</a>
{% endif %}
<span class="current">
{% trans "Page" %} {{ page_obj.number }} / {{ page_obj.paginator.num_pages }}
</span>
{% if page_obj.has_next %}
<a href="?page={{ page_obj.next_page_number }}">{% trans "next" %}</a>
<a href="?page={{ page_obj.paginator.num_pages }}">{% trans "last" %} &raquo;</a>
{% endif %}
</span>
<td>
</tr>
</tfoot>
{% endif %}
{% else %}
<tr class="nodata">
<td>{% trans "No data" %}</td>
</tr>
{% endif %}
{% endblock %}
</table>
{% endblock %}

View File

@ -0,0 +1,63 @@
{% extends "passerelle/manage.html" %}
{% load i18n passerelle gadjo staticfiles %}
{% block extrascripts %}
{{ block.super }}
<link rel="stylesheet" type="text/css" media="all" href="{% static "css/atreal_openads.css" %}?{% start_timestamp %}"/>
{% endblock %}
{% block breadcrumb %}
{{ block.super }}
{% if connecteur %}<a href="{{connecteur.get_absolute_url}}">{{ connecteur.slug }}</a>{% endif %}
{% endblock %}
{% block appbar %}
<h2>{{ view.model.get_verbose_name }}{% if object.name %} - {{ object.name }}{% endif %}</h2>
<span class="actions">
{% if object|can_delete:request.user %}
<a rel="popup" href="{{ object.get_delete_url }}">{% trans "delete" %}</a>
{% endif %}
{% if object|can_edit:request.user %}
<a rel="popup" href="{{ object.get_edit_url }}">{% trans "edit" %}</a>
{% endif %}
{% if not object.guichet and object|can_edit:request.user and guichet_add_url %}
<a rel="popup" href="{{ guichet_add_url }}">{% trans "Add a Guichet" %}</a>
{% endif %}
</span>
{% endblock %}
{% block content %}
<div id="description">
{% block description %}
{% for field, value in object.get_fields_kv %}
<p class="{{field.name}}">
{% if field.name == "guichet" %}
{% trans "Guichet" %} {% trans ":" %}
{% if value %}
<a href="{{value.get_absolute_url}}">{{value}}</a>
{% else %}
{% trans "None" %}
{% endif %}
{% elif field.name != "connecteur" and field.name != "forward_file" and field.name != "id" and field.name != "name" %}
{% trans field.verbose_name %} {% trans ":" %} {{value}}
{% endif %}
</p>
{% endfor %}
{% if object.forward_files.count %}
<p class="title">
{% trans "Forward files" %}
{% trans ":" %}
<a href="{% if forward_files_list_url %}{{forward_files_list_url}}{% else %}{{ object.forward_files.first.get_list_url }}{% endif %}">
{{ object.forward_files.count }}
{% if object.forward_files.count == 1 %}{% trans "forward file" %}
{% else %}{% trans "forward files" %}
{% endif %}
</a>
</p>
{% endif %}
{% endblock %}
</div>
{% endblock %}

View File

@ -0,0 +1,77 @@
{% extends "passerelle/manage/service_view.html" %}
{% load i18n passerelle gadjo staticfiles %}
{% block extrascripts %}
{{ block.super }}
<link rel="stylesheet" type="text/css" media="all" href="{% static "css/atreal_openads.css" %}?{% start_timestamp %}"/>
{% endblock %}
{% block appbar %}
<h2>{{ view.model.get_verbose_name }} - {{ object.title }}
{% with status=object.get_availability_status %}
{% if status %}
{% if status.down %}<span class="down" title="{{status.message}} {% trans 'since:' %} {{status.start_timestamp|date:"SHORT_DATETIME_FORMAT"}} ">{% trans 'Down' %}</span>{% endif %}
{% endif %}
{% endwith %}
</h2>
<span class="actions">
{% if object|can_edit:request.user and has_check_status %}
<a rel="popup" href="{% url 'manage-availability' resource_type=object|resource_type resource_pk=object.id %}">{% trans 'availability check parameters' %}</a>
{% endif %}
{% if object|can_edit:request.user %}
<a rel="popup" href="{% url 'logging-parameters' resource_type=object|resource_type resource_pk=object.id %}">{% trans 'logging parameters' %}</a>
{% endif %}
{% if object|can_delete:request.user %}
<a rel="popup" href="{{ object.get_delete_url }}">{% trans 'delete' %}</a>
{% endif %}
{% if object|can_edit:request.user %}
<a rel="popup" href="{{ object.get_edit_url }}">{% trans 'edit' %}</a>
{% endif %}
{% if object|can_edit:request.user and collectivite_add_url%}
<a rel="popup" href="{{ collectivite_add_url }}">{% trans "Add a collectivite" %}</a>
{% endif %}
</span>
{% endblock %}
{% block description %}
{{ block.super }}
{% if object.id %}
{% if object.collectivites.count %}
{% if object.collectivites.count == 1 %}
<p class="title">
{% trans 'Collectivites' %}
{% trans ":" %}
<a href="{{ object.collectivites.first.get_absolute_url }}">{{ object.collectivites.first }}</a>
</p>
{% elif object.collectivites.count < 10 %}
<p class="title">{% trans 'Collectivites' %}</p>
<ul class="connecteur-collectivites">
{% for col in object.collectivites.all %}
<li class="collectivite"><a href="{{ col.get_absolute_url }}">{{ col.name }}</a>
{% if col.guichet %}<span class="guichet"> ( {{ col.guichet }} ) </span>{% endif %}
</li>
{% endfor %}
</ul>
{% else %}
<p class="title">
{% trans 'Collectivites' %}
{% trans ":" %}
<a href="{{ object.collectivites.first.get_list_url }}">{{ object.collectivites.count }} {% trans 'collectivites' %}</a>
</p>
{% endif %}
{% endif %}
{% if object.forward_files.count %}
<p class="title">
{% trans "Forward files" %}
{% trans ":" %}
<a href="{{ object.forward_files.first.get_list_url }}">
{{ object.forward_files.count }}
{% if object.forward_files.count == 1 %}{% trans "forward file" %}
{% else %}{% trans "forward files" %}
{% endif %}
</a>
</p>
{% endif %}
{% endif %}
{% endblock %}

View File

@ -0,0 +1,34 @@
{% extends "passerelle/manage.html" %}
{% load i18n passerelle gadjo staticfiles %}
{% block extrascripts %}
{{ block.super }}
<link rel="stylesheet" type="text/css" media="all" href="{% static "css/atreal_openads.css" %}?{% start_timestamp %}"/>
{% endblock %}
{% block breadcrumb %}
{{ block.super }}
{% if connecteur %}<a href="{{connecteur.get_absolute_url}}">{{ connecteur.slug }}</a>{% endif %}
{% if collectivite %}<a href="{{collectivite.get_absolute_url}}">{{ collectivite.name }}</a>{% endif %}
{% endblock %}
{% block appbar %}
<h2>{{ view.model.get_verbose_name }} - {% if object.id %}{{ object.numero_dossier }} - {{ object.type_fichier }}{% endif %}</h2>
{% endblock %}
{% block content %}
<form method="post" enctype="multipart/form-data">
<div id="form-content">
{% csrf_token %}
{{ form|with_template }}
</div>
{% block buttons %}
<div class="buttons">
<button class="submit-button">{% trans "Save" %}</button>
<button class="cancel">{% trans "Cancel" %}</button>
</div>
{% endblock %}
</form>
{% endblock %}

View File

@ -0,0 +1,92 @@
{% extends "passerelle/manage.html" %}
{% load i18n passerelle gadjo staticfiles %}
{% block extrascripts %}
{{ block.super }}
<link rel="stylesheet" type="text/css" media="all" href="{% static "css/atreal_openads.css" %}?{% start_timestamp %}"/>
{% endblock %}
{% block breadcrumb %}
{{ block.super }}
{% if connecteur %}<a href="{{connecteur.get_absolute_url}}">{{ connecteur.slug }}</a>{% endif %}
{% if collectivite %}<a href="{{collectivite.get_absolute_url}}">{{ collectivite.name }}</a>{% endif %}
{% endblock %}
{% block appbar %}
<h2>{{ view.model.get_verbose_name }} - {% trans 'list' %}</h2>
{% endblock %}
{% block content %}
<table id="list-forward-files">
{% block list %}
{% if object_list %}
<thead>
<tr>
<th class="actions">{% trans "Actions" %}</th>
{% for field in view.model.get_fields %}
{% if field.name != "connecteur" and field.name != "file_hash" and field.name != "orig_filename" %}
<th class="{{field.name}}">{% trans field.verbose_name %}</th>
{% endif %}
{% endfor %}
</tr>
</thead>
<tbody>
{% for ff in object_list %}
<tr>
<td class="actions">
<span class="edit-link"><a rel="popup" href="{{ff.get_edit_url}}?back-to={% if collectivite %}col-{% endif %}list-forward-files" >{% trans "edit" %}</a></span>
<span class="delete-link"><a rel="popup" href="{{ff.get_delete_url}}?back-to={% if collectivite %}col-{% endif %}list-forward-files">{% trans "delete" %}</a></span>
</td>
{% for field, value in ff.get_fields_kv %}
{% if field.name != "connecteur" and field.name != "file_hash" and field.name != "orig_filename" %}
<td class="{{field.name}}">
{% if field.name != "collectivite" and field.name != "upload_file" %}
{{value}}
{% elif field.name == "collectivite" %}
{% if value %}<a href="{{value.get_absolute_url}}">{{value}}</a>
{% else %}{% trans "None" %}
{% endif %}
{% elif field.name == "upload_file" %}
{% if value %}<a href="{{value.url}}">{{value}}</a>
{% else %}{% trans "None" %}
{% endif %}
{% endif %}
</td>
{% endif %}
{% endfor %}
</tr>
{% endfor %}
</tbody>
{% if is_paginated %}
<tfoot>
<tr>
<td class="pagination" colspan="{{ view.model.get_fields|length }}">
<span class="step-links">
{% if page_obj.has_previous %}
<a href="?page=1">&laquo; {% trans "first" %}</a>
<a href="?page={{ contacts.previous_page_number }}">{% trans "previous" %}</a>
{% endif %}
<span class="current">
{% trans "Page" %} {{ page_obj.number }} / {{ page_obj.paginator.num_pages }}
</span>
{% if page_obj.has_next %}
<a href="?page={{ page_obj.next_page_number }}">{% trans "next" %}</a>
<a href="?page={{ page_obj.paginator.num_pages }}">{% trans "last" %} &raquo;</a>
{% endif %}
</span>
<td>
</tr>
</tfoot>
{% endif %}
{% else %}
<tr class="nodata">
<td>{% trans "No data" %}</td>
</tr>
{% endif %}
{% endblock %}
</table>
{% endblock %}

View File

@ -0,0 +1,56 @@
{% extends "passerelle/manage.html" %}
{% load i18n passerelle gadjo staticfiles %}
{% block extrascripts %}
{{ block.super }}
<link rel="stylesheet" type="text/css" media="all" href="{% static "css/atreal_openads.css" %}?{% start_timestamp %}"/>
{% endblock %}
{% block breadcrumb %}
{{ block.super }}
{% if connecteur %}<a href="{{connecteur.get_absolute_url}}">{{ connecteur.slug }}</a>{% endif %}
{% if collectivite %}<a href="{{collectivite.get_absolute_url}}">{{ collectivite.name }}</a>{% endif %}
{% endblock %}
{% block appbar %}
<h2>{{ view.model.get_verbose_name }} - {% if object.id %}{{ object.numero_dossier }} - {{ object.type_fichier }}{% endif %}</h2>
<span class="actions">
{% if object|can_delete:request.user %}
<a rel="popup" href="{{ object.get_delete_url }}">{% trans "delete" %}</a>
{% endif %}
{% if object|can_edit:request.user %}
<a rel="popup" href="{{ object.get_edit_url }}">{% trans "edit" %}</a>
{% endif %}
</span>
{% endblock %}
{% block content %}
<div id="description">
{% block description %}
{% for field, value in object.get_fields_kv %}
<p class="{{field.name}}">
{% if field.name == "collectivite" %}
{% trans "Collectivite" %} {% trans ":" %}
{% if value %}
<a href="{{value.get_absolute_url}}">{{value}}</a>
{% else %}
{% trans "None" %}
{% endif %}
{% elif field.name == "upload_file" %}
{% trans "File" %} {% trans ":" %}
{% if value %}
<a href="{{value.url}}">{{value}}</a>
{% else %}
{% trans "None" %}
{% endif %}
{% elif field.name != "connecteur" and field.name != "id" %}
{% trans field.verbose_name %} {% trans ":" %} {{value}}
{% endif %}
</p>
{% endfor %}
{% endblock %}
</div>
{% endblock %}

View File

@ -0,0 +1,34 @@
{% extends "passerelle/manage.html" %}
{% load i18n passerelle gadjo staticfiles %}
{% block extrascripts %}
{{ block.super }}
<link rel="stylesheet" type="text/css" media="all" href="{% static "css/atreal_openads.css" %}?{% start_timestamp %}"/>
{% endblock %}
{% block breadcrumb %}
{{ block.super }}
{% if connecteur %}<a href="{{connecteur.get_absolute_url}}">{{ connecteur.slug }}</a>{% endif %}
{% if collectivite %}<a href="{{collectivite.get_absolute_url}}">{{ collectivite.name }}</a>{% endif %}
{% endblock %}
{% block appbar %}
<h2>{{ view.model.get_verbose_name }} {% if object.id and object.collectivite %} - {{ object.collectivite }}{% else %}{% trans 'New' %}{% endif %}</h2>
{% endblock %}
{% block content %}
<form method="post" enctype="multipart/form-data">
<div id="form-content">
{% csrf_token %}
{{ form|with_template }}
</div>
{% block buttons %}
<div class="buttons">
<button class="submit-button">{% trans "Save" %}</button>
<button class="cancel">{% trans "Cancel" %}</button>
</div>
{% endblock %}
</form>
{% endblock %}

View File

@ -0,0 +1,44 @@
{% extends "passerelle/manage.html" %}
{% load i18n passerelle gadjo staticfiles %}
{% block extrascripts %}
{{ block.super }}
<link rel="stylesheet" type="text/css" media="all" href="{% static "css/atreal_openads.css" %}?{% start_timestamp %}"/>
{% endblock %}
{% block breadcrumb %}
{{ block.super }}
{% if connecteur %}<a href="{{connecteur.get_absolute_url}}">{{ connecteur.slug }}</a>{% endif %}
{% if collectivite %}<a href="{{collectivite.get_absolute_url}}">{{ collectivite.name }}</a>{% endif %}
{% endblock %}
{% block appbar %}
<h2>{{ view.model.get_verbose_name }}</h2>
<span class="actions">
{% if object|can_delete:request.user %}
<a rel="popup" href="{{ object.get_delete_url }}">{% trans "delete" %}</a>
{% endif %}
{% if object|can_edit:request.user %}
<a rel="popup" href="{{ object.get_edit_url }}">{% trans "edit" %}</a>
{% endif %}
</span>
{% endblock %}
{% block content %}
<div id="description">
{% block description %}
{% for field, value in object.get_fields_kv %}
{% if field.name != "id" %}
<p class="{{field.name}}">
{% if field.name != "collectivite" %}
{% trans field.verbose_name %} {% trans ":" %} {{value}}
{% endif %}
</p>
{% endif %}
{% endfor %}
{% endblock %}
</div>
{% endblock %}

91
atreal_openads/urls.py Normal file
View File

@ -0,0 +1,91 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import re
from django.conf.urls import url
from .views import (
AtrealOpenadsView,
ForwardFileView,
ForwardFileListView,
ForwardFileUpdateView,
ForwardFileDeleteView,
CollectiviteView,
CollectiviteListView,
CollectiviteCreateView,
CollectiviteUpdateView,
CollectiviteDeleteView,
GuichetView,
GuichetCreateView,
GuichetUpdateView,
GuichetDeleteView
)
urlpatterns = [
url(r'^(?P<slug>[\w,-]+)/$', AtrealOpenadsView.as_view(), name='view-connector')
]
management_urlpatterns = []
for view in [
ForwardFileView,
ForwardFileListView,
ForwardFileUpdateView,
ForwardFileDeleteView,
CollectiviteView,
CollectiviteListView,
CollectiviteCreateView,
CollectiviteUpdateView,
CollectiviteDeleteView,
GuichetView,
GuichetCreateView,
GuichetUpdateView,
GuichetDeleteView
]:
view_class_name = str(view.__name__)
m = re.search(r'^.*(Create|Update|Delete|List)View$', view_class_name)
if m:
view_action = m.group(1).lower()
else:
view_action = 'view'
# no prefix for action 'view'
url_prefix = view_action.replace('update', 'edit') + '-'
regex_base = r'^(?P<connecteur>[\w,-]+)/'
regex_pkey = '/(?P<pk>[\w,-]+)'
url_name = url_prefix + view.model.get_class_name_dash_case()
regex_url = '%s%s' % (url_prefix if view_action != 'view' else '',
view.model.get_class_name_dash_case())
# no primary key for action 'create' and 'list'
if view_action in ['create', 'list']:
regex_pkey = ''
# plural form of the url for action 'list' and no prefix
if view_action == 'list':
url_name = url_prefix + view.model.get_class_name_plural_dash_case()
regex_url = view.model.get_class_name_plural_dash_case()
# for 'guichet' prefix the regex by the collectivite
if view.model.get_class_name() == 'Guichet':
regex_base += 'collectivite/(?P<collectivite>[\w,-]+)/'
# build the regex
regex = regex_base + regex_url + regex_pkey + '$'
# add the url pattern to the management list
management_urlpatterns += [url(regex, view.as_view(), name=url_name)]
# add the ForwardFile 'list' url patterns for Collectivite
ff_list_regex_url = ForwardFileListView.model.get_class_name_plural_dash_case()
management_urlpatterns += [
url(
r'^(?P<connecteur>[\w,-]+)/collectivite/(?P<collectivite>[\w,-]+)/' + ff_list_regex_url + '$',
ForwardFileListView.as_view(),
name='col-list-' + ff_list_regex_url
)
]

258
atreal_openads/utils.py Normal file
View File

@ -0,0 +1,258 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# passerelle - uniform access to multiple data sources and services
# Copyright (C) 2018 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import json
import base64
import datetime
import re
import hashlib
import copy
import mimetypes
import sys
from HTMLParser import HTMLParser
from django.urls import reverse_lazy
def to_dash_case(camel_str):
s1 = re.sub('(.)([A-Z][a-z]+)', r'\1-\2', camel_str)
return re.sub('([a-z0-9])([A-Z])', r'\1-\2', s1).lower()
# from: https://stackoverflow.com/a/13848698
def force_encoded_string_output(func, default_enc='utf-8'):
"""Decorator function that return the result converted to str type."""
if sys.version_info.major < 3:
def _func(*args, **kwargs):
return func(*args, **kwargs).encode(sys.stdout.encoding or default_enc)
return _func
else:
return func
class MLStripper(HTMLParser):
"""HTML parser that removes html tags."""
def __init__(self):
self.reset()
self.fed = []
def handle_data(self, d):
self.fed.append(d)
def get_data(self):
return ''.join(self.fed)
def strip_tags(html):
"""Remove html tags from a string."""
s = MLStripper()
s.feed(html)
return s.get_data()
def clean_spaces(text):
"""Remove extra spaces an line breaks from a string."""
text = text.replace('\n', ' ')
text = text.replace('\r', ' ')
text = text.replace('\t', ' ')
text = text.replace('\\n', ' ')
text = text.replace('\\r', ' ')
text = text.replace('\\t', ' ')
return re.sub(r' +', ' ', text).strip()
def normalize(value):
"""Normalize a value to be send to openADS.API."""
if value is None:
return ''
if not isinstance(value, unicode):
value = unicode(value)
return clean_spaces(value)
def get_file_data(path, b64=True):
"""Return the content of a file as a string, in base64 if specified."""
with open(path, 'r') as f:
if b64:
return base64.b64encode(f.read())
return f.read()
# copy-pasted from 'wcs/qommon/misc.py'
def get_file_digest(content, chunk_size=100000):
"""Return a hash for the content specified."""
digest = hashlib.sha256()
content.seek(0)
def read_chunk():
return content.read(chunk_size)
for chunk in iter(read_chunk, ''):
digest.update(chunk)
return digest.hexdigest()
def get_upload_path(instance, filename=None):
"""Return a relative upload path for a file."""
fn_ref = instance.orig_filename if instance.orig_filename else filename
# file_hash and content_type attribute are updated on file save()
# so if the file was not yet saved, it may have those attributes undefined
# this is why we update them here, if they are empty
instance.update_file_hash(only_if_empty=True)
instance.update_content_type(only_if_empty=True)
# be careful:
# * openADS accept only filename less than 50 chars
# * name should be unique, even if the content is the same
return 'to_openADS__%s__%s%s' % (
datetime.datetime.now().strftime('%Y-%m-%d_%Hh%Mm%Ss%f'),
instance.file_hash[:4],
get_file_extension(fn_ref, instance.content_type)[:5]
)
def get_file_extension(filename, mimetype=None):
"""Return the extension of the file, according to its filename or specified mimetype."""
file_extension = None
if filename and '.' in filename:
file_extension = re.sub(r'^.*\.', '.', filename)
elif mimetype:
file_extension = mimetypes.guess_extension(mimetype)
return file_extension if file_extension else ''
def trunc_str_values(value, limit, visited=None, truncate_text=u''):
"""Truncate a string value (not dict keys) and append a truncate text."""
if visited is None:
visited = []
if not value in visited:
if isinstance(value, basestring) and len(value) > limit:
value = value[:limit] + truncate_text
elif isinstance(value, dict) or isinstance(value, list) or isinstance(value, tuple):
visited.append(value)
iterator = value.iteritems() if isinstance(value, dict) else enumerate(value)
for k,v in iterator:
value[k] = trunc_str_values(v, limit, visited, truncate_text)
return value
class DictDumper(object):
"""Helper to dump a dictionary to a string representation with lazy processing.
Only applied when dict is converted to string (lazy processing):
- long strings truncated (after the dict has been 'deep' copied)
- (optionaly) dict converted with json.dumps instead of unicode().
"""
def __init__(self, dic, max_str_len=255, use_json_dumps=True):
""" arguments:
- dic string the dict to dump
- max_str_len integer the maximul length of string values
- use_json_dumps boolean True to use json.dumps() else it uses unicode()
"""
self.dic = dic
self.max_str_len = max_str_len
self.use_json_dumps = use_json_dumps
@force_encoded_string_output
def __repr__(self):
return u'DictDumper(dic=%r,max_str_len=%r,use_json_dumps=%r)' % (
self.dic, self.max_str_len, self.use_json_dumps)
@force_encoded_string_output
def __str__(self):
return unicode(self)
def __unicode__(self):
dict_trunc = trunc_str_values(copy.deepcopy(self.dic), self.max_str_len)
dict_ref = json.dumps(dict_trunc) if self.use_json_dumps else dict_trunc
return unicode(dict_ref)
class BaseModel(object):
"""A class that provide basic usefull functions.
Intended for all models to extends it.
"""
@classmethod
def get_verbose_name(cls):
"""Return the verbose name of the class (helper for META option)."""
return cls._meta.verbose_name
@classmethod
def get_verbose_name_plural(cls):
"""Return the plural form of the verbose name of the class (helper for META option)."""
return cls._meta.verbose_name_plural
@classmethod
def get_class_name(cls):
return cls.__name__
@classmethod
def get_class_name_plural(cls):
return cls.get_class_name() + 's'
@classmethod
def get_class_name_dash_case(cls):
return to_dash_case(cls.get_class_name())
@classmethod
def get_class_name_plural_dash_case(cls):
return to_dash_case(cls.get_class_name_plural())
@classmethod
def get_class_name_title(cls):
return cls.get_class_name_dash_case().replace('-', ' ').title()
@classmethod
def get_class_name_plural_title(cls):
return cls.get_class_name_plural_dash_case().replace('-', ' ').title()
@classmethod
def get_fields(cls):
"""Return the fields of the class (helper for META option)."""
return cls._meta.get_fields(include_parents=True, include_hidden=False)
@force_encoded_string_output
def __str__(self):
return unicode(self)
# mainly for the view
def get_fields_kv(self):
"""Return the model's list of field's key value."""
return [(field, getattr(self, field.name, None)) for field in self._meta.get_fields()]
def get_url_name(self, prefix='', plural=False):
class_name_dash_case = self.__class__.get_class_name_dash_case()
if plural:
class_name_dash_case = self.__class__.get_class_name_plural_dash_case()
return '%s%s' % (prefix + '-' if prefix else '', class_name_dash_case)
def get_url_params(self, primary_key=True):
return {'pk': self.id} if primary_key else {}
def get_absolute_url(self):
return reverse_lazy(self.get_url_name('view'), kwargs=self.get_url_params())
def get_edit_url(self):
return reverse_lazy(self.get_url_name('edit'), kwargs=self.get_url_params())
def get_delete_url(self):
return reverse_lazy(self.get_url_name('delete'), kwargs=self.get_url_params())
def get_list_url(self):
return reverse_lazy(self.get_url_name('list', True), kwargs=self.get_url_params(False))

319
atreal_openads/views.py Normal file
View File

@ -0,0 +1,319 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from django.urls import reverse_lazy
from django.views.generic.detail import DetailView
from django.views.generic.list import ListView
from django.views.generic.edit import CreateView, UpdateView, DeleteView
from passerelle.views import GenericConnectorView
from .models import ForwardFile, Collectivite, Guichet, AtrealOpenads
from .forms import ForwardFileForm, CollectiviteForm, GuichetForm
def get_connecteur_from_request(view, key='connecteur'):
"""Return the 'connecteur' object from the view object."""
if not hasattr(view, 'connecteur') or not view.connecteur and view.request:
connecteur_slug = view.request.resolver_match.kwargs.get(key, None)
if connecteur_slug:
view.connecteur = AtrealOpenads.objects.get(slug=connecteur_slug)
return view.connecteur if hasattr(view, 'connecteur') else None
def get_collectivite_from_request(view, key='collectivite'):
"""Return the 'collectivite' object from the view object."""
if not hasattr(view, 'collectivite') or not view.collectivite and view.request:
collectivite_id = view.request.resolver_match.kwargs.get(key, None)
if collectivite_id:
view.collectivite = Collectivite.objects.get(id=collectivite_id)
return view.collectivite if hasattr(view, 'collectivite') else None
class ForwardFileView(DetailView):
model = ForwardFile
template_name = 'atreal_openads/manage/forwardfile_view.html'
def get_context_data(self, *args, **kwargs):
context = super(ForwardFileView, self).get_context_data(*args, **kwargs)
context['connecteur'] = get_connecteur_from_request(self)
return context
class ForwardFileListView(ListView):
model = ForwardFile
template_name = 'atreal_openads/manage/forwardfile_list.html'
paginate_by = 50
ordering = 'id'
def get_queryset(self):
qset = None
collectivite = get_collectivite_from_request(self)
if collectivite:
qset = super(ForwardFileListView, self).get_queryset().filter(
connecteur=get_connecteur_from_request(self),
collectivite=collectivite)
else:
qset = super(ForwardFileListView, self).get_queryset().filter(
connecteur=get_connecteur_from_request(self))
order_by = None
order_by_param = self.request.GET.get('order-by', None)
if order_by_param:
fields_names = [f.name for f in self.model.get_fields()]
order_by_field = order_by_param[1:] if order_by_param[0] == '-' else order_by_param
if order_by_field in fields_names:
order_by = order_by_param
return qset.order_by(order_by) if order_by else qset # qset.order_by()
def get_context_data(self, *args, **kwargs):
context = super(ForwardFileListView, self).get_context_data(*args, **kwargs)
context['connecteur'] = get_connecteur_from_request(self)
context['collectivite'] = get_collectivite_from_request(self)
return context
class ForwardFileUpdateView(UpdateView):
model = ForwardFile
form_class = ForwardFileForm
template_name = 'atreal_openads/manage/forwardfile_form.html'
def get_context_data(self, *args, **kwargs):
context = super(ForwardFileUpdateView, self).get_context_data(*args, **kwargs)
context['connecteur'] = get_connecteur_from_request(self)
context['collectivite'] = get_collectivite_from_request(self)
return context
def get_success_url(self, *args, **kwargs):
back_to = self.request.GET.get('back-to')
if back_to == 'list-forward-files':
return reverse_lazy('list-forward-files', kwargs={
'connecteur': get_connecteur_from_request(self).slug
})
elif back_to == 'col-list-forward-files':
obj = self.get_object()
if obj.collectivite:
return reverse_lazy('col-list-forward-files', kwargs={
'connecteur': get_connecteur_from_request(self).slug,
'collectivite': obj.collectivite.id
})
return self.get_object().get_absolute_url()
class ForwardFileDeleteView(DeleteView):
model = ForwardFile
form_class = ForwardFileForm
template_name = 'atreal_openads/manage/forwardfile_form.html'
def get_context_data(self, *args, **kwargs):
context = super(ForwardFileDeleteView, self).get_context_data(*args, **kwargs)
context['connecteur'] = get_connecteur_from_request(self)
context['collectivite'] = get_collectivite_from_request(self)
return context
def get_success_url(self, *args, **kwargs):
back_to = self.request.GET.get('back-to')
if back_to == 'list-forward-files':
return reverse_lazy('list-forward-files', kwargs={
'connecteur': get_connecteur_from_request(self).slug
})
elif back_to == 'col-list-forward-files':
obj = self.get_object()
if obj.collectivite:
return reverse_lazy('col-list-forward-files', kwargs={
'connecteur': get_connecteur_from_request(self).slug,
'collectivite': obj.collectivite.id
})
return reverse_lazy('view-connector', kwargs={
'connector': 'atreal-openads',
'slug' : get_connecteur_from_request(self).slug
})
class CollectiviteView(DetailView):
model = Collectivite
template_name = 'atreal_openads/manage/collectivite_view.html'
def get_context_data(self, *args, **kwargs):
context = super(CollectiviteView, self).get_context_data(*args, **kwargs)
context['connecteur'] = get_connecteur_from_request(self)
context['guichet_add_url'] = reverse_lazy('create-guichet', kwargs={
'connecteur' : context['connecteur'].slug,
'collectivite': self.get_object().id})
context['forward_files_list_url'] = reverse_lazy('col-list-forward-files', kwargs={
'connecteur' : context['connecteur'].slug,
'collectivite': self.get_object().id})
return context
class CollectiviteListView(ListView):
model = Collectivite
template_name = 'atreal_openads/manage/collectivite_list.html'
paginate_by = 50
ordering = 'id'
def get_queryset(self):
qset = super(CollectiviteListView, self).get_queryset().filter(
connecteur=get_connecteur_from_request(self))
order_by = None
order_by_param = self.request.GET.get('order-by', None)
if order_by_param:
fields_names = [f.name for f in self.model.get_fields()]
order_by_field = order_by_param[1:] if order_by_param[0] == '-' else order_by_param
if order_by_field in fields_names:
order_by = order_by_param
return qset.order_by(order_by) if order_by else qset # qset.order_by()
def get_context_data(self, *args, **kwargs):
context = super(CollectiviteListView, self).get_context_data(*args, **kwargs)
context['connecteur'] = get_connecteur_from_request(self)
context['collectivite_add_url'] = reverse_lazy('create-collectivite', kwargs={
'connecteur': context['connecteur'].slug})
return context
class CollectiviteCreateView(CreateView):
model = Collectivite
form_class = CollectiviteForm
template_name = 'atreal_openads/manage/collectivite_form.html'
def get_context_data(self, *args, **kwargs):
context = super(CollectiviteCreateView, self).get_context_data(*args, **kwargs)
context['connecteur'] = get_connecteur_from_request(self)
return context
def get_form_kwargs(self):
kwargs = super(CollectiviteCreateView, self).get_form_kwargs()
kwargs['connecteur'] = get_connecteur_from_request(self)
return kwargs
def get_success_url(self, *args, **kwargs):
if self.request.GET.get('back-to') == 'list-collectivites':
return reverse_lazy('list-collectivites', kwargs={
'connecteur' : get_connecteur_from_request(self).slug
})
return reverse_lazy('view-connector', kwargs={
'connector': 'atreal-openads',
'slug' : get_connecteur_from_request(self).slug
})
class CollectiviteUpdateView(UpdateView):
model = Collectivite
form_class = CollectiviteForm
template_name = 'atreal_openads/manage/collectivite_form.html'
def get_context_data(self, *args, **kwargs):
context = super(CollectiviteUpdateView, self).get_context_data(*args, **kwargs)
context['connecteur'] = get_connecteur_from_request(self)
return context
def get_success_url(self, *args, **kwargs):
if self.request.GET.get('back-to') == 'list-collectivites':
return reverse_lazy('list-collectivites', kwargs={
'connecteur' : get_connecteur_from_request(self).slug
})
return self.get_object().get_absolute_url()
class CollectiviteDeleteView(DeleteView):
model = Collectivite
form_class = CollectiviteForm
template_name = 'atreal_openads/manage/collectivite_form.html'
def get_context_data(self, *args, **kwargs):
context = super(CollectiviteDeleteView, self).get_context_data(*args, **kwargs)
context['connecteur'] = get_connecteur_from_request(self)
return context
def get_success_url(self, *args, **kwargs):
if self.request.GET.get('back-to') == 'list-collectivites':
return reverse_lazy('list-collectivites', kwargs={
'connecteur' : get_connecteur_from_request(self).slug
})
return reverse_lazy('view-connector', kwargs={
'connector': 'atreal-openads',
'slug' : get_connecteur_from_request(self).slug
})
class GuichetView(DetailView):
model = Guichet
template_name = 'atreal_openads/manage/guichet_view.html'
def get_context_data(self, *args, **kwargs):
context = super(GuichetView, self).get_context_data(*args, **kwargs)
context['connecteur'] = get_connecteur_from_request(self)
context['collectivite'] = get_collectivite_from_request(self)
return context
class GuichetCreateView(CreateView):
model = Guichet
form_class = GuichetForm
template_name = 'atreal_openads/manage/guichet_form.html'
def get_form_kwargs(self):
kwargs = super(GuichetCreateView, self).get_form_kwargs()
kwargs['collectivite'] = get_collectivite_from_request(self)
return kwargs
def get_context_data(self, *args, **kwargs):
context = super(GuichetCreateView, self).get_context_data(*args, **kwargs)
context['connecteur'] = get_connecteur_from_request(self)
context['collectivite'] = get_collectivite_from_request(self)
return context
def get_success_url(self):
return reverse_lazy('view-collectivite', kwargs={
'connecteur': get_connecteur_from_request(self).slug,
'pk' : get_collectivite_from_request(self).id
})
class GuichetUpdateView(UpdateView):
model = Guichet
form_class = GuichetForm
template_name = 'atreal_openads/manage/guichet_form.html'
def get_context_data(self, *args, **kwargs):
context = super(GuichetUpdateView, self).get_context_data(*args, **kwargs)
context['connecteur'] = get_connecteur_from_request(self)
context['collectivite'] = get_collectivite_from_request(self)
return context
class GuichetDeleteView(DeleteView):
model = Guichet
form_class = GuichetForm
template_name = 'atreal_openads/manage/guichet_form.html'
def get_context_data(self, *args, **kwargs):
context = super(GuichetDeleteView, self).get_context_data(*args, **kwargs)
context['connecteur'] = get_connecteur_from_request(self)
context['collectivite'] = get_collectivite_from_request(self)
return context
def get_success_url(self, *args, **kwargs):
return reverse_lazy('view-collectivite', kwargs={
'connecteur': get_connecteur_from_request(self).slug,
'pk' : get_collectivite_from_request(self).id
})
class AtrealOpenadsView(GenericConnectorView):
model = AtrealOpenads
template_name = 'atreal_openads/manage/connector_view.html'
def get_context_data(self, *args, **kwargs):
context = super(AtrealOpenadsView, self).get_context_data(*args, **kwargs)
context['collectivite_fields'] = Collectivite.get_fields()
context['collectivite_add_url'] = reverse_lazy('create-collectivite', kwargs={
'connecteur': self.get_object().slug})
return context

View File

@ -12,32 +12,39 @@ import json
import os
import base64
import re
import datetime
import magic
from requests import Response
from django.http import Http404
from django.http.request import HttpRequest, QueryDict
from django.http.response import JsonResponse
from django.core.files import File
from django.core.files.base import ContentFile
#from django.db.models.query import QuerySet
from django.core.exceptions import ValidationError
from passerelle.utils.jsonresponse import APIError
from passerelle.base.models import Job
from atreal_openads.models import (
strip_tags,
clean_spaces,
normalize,
from atreal_openads.utils import (
get_file_data,
get_upload_path,
trunc_str_values,
DictDumper,
AtrealOpenads,
ForwardFile
get_file_digest,
trunc_str_values
)
from atreal_openads.models import (
ForwardFile,
Guichet,
Collectivite,
AtrealOpenads
)
CONNECTOR_NAME = 'atreal-openads'
CONNECTOR_SLUG = 'atreal'
COLLECTIVITE = 3
COLLECTIVITE = 79
OPENADS_API_LOGIN = 'publik-passerelle'
OPENADS_API_PASSWORD = base64.urlsafe_b64encode(os.urandom(20))
OPENADS_API_URL = 'http://openads.api/'
@ -55,114 +62,221 @@ TEST_FILE_PLAN_CADASTRAL = os.path.join(RESOURCES_DIR, 'plancadastral.pdf')
def atreal_openads(db):
return AtrealOpenads.objects.create(
slug = CONNECTOR_SLUG,
collectivite = COLLECTIVITE,
default_collectivite_openADS_id = COLLECTIVITE,
openADS_API_url = OPENADS_API_URL,
basic_auth_username = OPENADS_API_LOGIN,
basic_auth_password = OPENADS_API_PASSWORD
)
@pytest.fixture
def collectivite_1(db, atreal_openads):
return Collectivite.objects.create(
name = u'Macollectivité',
connecteur = atreal_openads,
openADS_id = '3'
)
def test_strip_tags():
s = 'aaa b cc '
assert strip_tags(s) == s
ss = s + '<em>dd'
assert strip_tags(ss) == s + 'dd'
ss = s + '<em>dd</em>'
assert strip_tags(ss) == s + 'dd'
ss = s + '<em>dd</em>'
assert strip_tags(ss) == s + 'dd'
ss = s + ' 1 < 3'
assert strip_tags(ss) == s + ' 1 < 3'
@pytest.fixture
def collectivite_1_guichet(db, atreal_openads, collectivite_1):
return Guichet.objects.create(
collectivite = collectivite_1,
ouverture_jour_h = datetime.time(9, 0),
fermeture_jour_h = datetime.time(17, 0),
ouverture_sem_d = 1, # Lundi
fermeture_sem_d = 5, # Vendredi
ouverture_sem_h = datetime.time(8, 30),
fermeture_sem_h = datetime.time(12, 15)
)
def test_clean_spaces():
s = 'aaa b cc '
assert clean_spaces(s) == 'aaa b cc'
s = 'a\ta b\nb c\rc d\\n\\r\\td'
assert clean_spaces(s) == 'a a b b c c d d'
def upload2ForwardFile(connecteur, path, numero_dossier, type_fichier):
"""Convert a file path to a ForwardFile."""
if path:
rand_id = base64.urlsafe_b64encode(os.urandom(6))
fwd_file = ForwardFile()
fwd_file.connecteur = connecteur
fwd_file.numero_demande = rand_id
fwd_file.numero_dossier = numero_dossier
fwd_file.type_fichier = type_fichier
fwd_file.orig_filename = os.path.basename(path)
fwd_file.content_type = magic.from_file(path, mime=True)
with open(path, 'r') as fp:
fwd_file.file_hash = get_file_digest(fp)
fwd_file.upload_file = File(open(path, 'r'))
fwd_file.upload_status = 'pending'
return fwd_file
return None
def test_normalize():
assert normalize(None) == ''
def test_forward_file(atreal_openads):
ff = ForwardFile(
numero_demande='45641531',
numero_dossier=FAKE_NUMERO_DOSSIER,
type_fichier='CERFA',
orig_filename='afile',
file_hash='ffdf456fdsvgb4bgfb6g4f5b',
upload_status='pending',
connecteur=atreal_openads,
collectivite=None
)
ff.upload_file.save(ff.orig_filename, ContentFile(get_file_data(TEST_FILE_CERFA_DIA)))
ff.save()
s = 'aaa b cc '
assert normalize(s) == 'aaa b cc'
assert repr(ff) == (
u'ForwardFile(id=%s,connecteur=%s,collectivite=%s'
',demande=%s,dossier=%s,type=%s,filename=%s,status=%s)' % (
ff.id, unicode(ff.connecteur), None,
ff.numero_demande, ff.numero_dossier,
ff.type_fichier, ff.orig_filename, ff.upload_status
)
).encode('utf-8')
s = 'a\ta b\nb c\rc d\\n\\r\\td'
assert normalize(s) == 'a a b b c c d d'
assert str(ff) == '%s[%s]' % (trunc_str_values(ff.orig_filename, 20), 'Pending')
assert unicode(ff) == u'%s[%s]' % (trunc_str_values(ff.orig_filename, 20), 'Pending')
assert ff.get_status() == 'Pending'
assert ff.get_status('invalid') == 'invalid'
params = ff.get_url_params()
assert params['connecteur'] == atreal_openads.slug
assert ff.upload_file is not None
assert ff.upload_file.size > 0
assert ff.size == ff.upload_file.size
assert ff.file_hash == '811588016518eedeb4507f3e4c41be958a03576b0cd20bdb2cb9c6a186dbd887'
ff.content_type = 'application/pdf'
ff.upload_status = 'success'
ff.save()
assert ff.upload_status == 'success'
assert ff.get_status() == 'Success'
assert ff.content_type == 'application/pdf'
with pytest.raises(ValueError) as e:
ff.upload_file.size
assert unicode(e.value) == "The 'upload_file' attribute has no file associated with it."
assert ff.size > 0
assert ff.file_hash == '811588016518eedeb4507f3e4c41be958a03576b0cd20bdb2cb9c6a186dbd887'
ff.file_hash = ''
ff.update_file_hash()
ff.update_content_type()
ff.save()
assert ff.file_hash == ''
assert ff.content_type == ''
ff.orig_filename = ''
with pytest.raises(ValidationError) as e:
ff.save()
assert len(e.value.messages) == 1
assert '__all__' in e.value.message_dict
assert unicode(e.value.message_dict['__all__'][0]) == u"A %s cannot have all the following fields empty: %s." % (
ff.get_verbose_name(),
['file_hash', 'orig_filename', 'upload_file'])
ff.delete()
def test_get_file_data():
assert get_file_data(TEST_FILE_CERFA_DIA) == base64.b64encode(open(TEST_FILE_CERFA_DIA).read())
assert get_file_data(TEST_FILE_CERFA_DIA, b64=False) == open(TEST_FILE_CERFA_DIA).read()
def test_collectivite(collectivite_1, collectivite_1_guichet):
col = collectivite_1
assert repr(col) == (
u'Collectivite(id=%s,name=%s,connecteur=%s,openADS_id=%s,guichet=%s)' % (
1, unicode(col.name), unicode(col.connecteur), col.openADS_id,
unicode(col.guichet) if hasattr(col, 'guichet') else None
)
).encode('utf-8')
assert str(col) == col.name.encode('utf-8')
assert unicode(col) == col.name
class_fields = Collectivite.get_fields()
assert len(class_fields) == 6
assert class_fields[0].name == 'id'
assert class_fields[1].name == 'name'
assert class_fields[2].name == 'connecteur'
assert class_fields[3].name == 'openADS_id'
assert class_fields[4].name == 'guichet'
assert class_fields[5].name == 'forward_file'
instance_fields = col.get_fields_kv()
assert len(instance_fields) == 6
assert instance_fields[0][0].name == 'id'
assert instance_fields[1][0].name == 'name'
assert instance_fields[2][0].name == 'connecteur'
assert instance_fields[3][0].name == 'openADS_id'
assert instance_fields[4][0].name == 'guichet'
assert instance_fields[5][0].name == 'forward_file'
assert instance_fields[0][1] == col.id
assert instance_fields[1][1] == col.name
assert instance_fields[2][1] is col.connecteur
assert instance_fields[3][1] == col.openADS_id
assert instance_fields[4][1] is col.guichet
assert instance_fields[5][1] is None # shouldn't it be QuerySet?
params = col.get_url_params()
assert params['connecteur'] == col.connecteur.slug
def test_get_upload_path(app, atreal_openads):
FF = atreal_openads.upload2ForwardFile(TEST_FILE_CERFA_DIA, FAKE_NUMERO_DOSSIER, 'cerfa')
assert re.search(
r"^pass_openADS_up_%s_%s$" %
('[0-9]{4}-[A-Z][a-z]{2}-[0-9]{2}_[0-9]{2}h[0-9]{2}m[0-9]{2}s[0-9]+', 'cc90'),
get_upload_path(FF))
def test_guichet(collectivite_1_guichet):
g = collectivite_1_guichet
assert repr(g) == (
u'Guichet(id=%s,collectivite=%s,%s)' % (
1, unicode(g.collectivite), unicode(g)
)
).encode('utf-8')
assert str(g) == u'Monday 08:30 -> Friday 12:15 [09:00/17:00]'.encode('utf-8')
assert unicode(g) == u'Monday 08:30 -> Friday 12:15 [09:00/17:00]'
params = g.get_url_params()
assert params['collectivite'] == g.collectivite.id
with pytest.raises(Exception) as e:
g.get_list_url()
assert unicode(e.value) == u"Guichet:get_list_url() method should not be called"
def test_trunc_str_values():
d = {}
assert trunc_str_values(d, 10) == d
d = {'a': '123456789'}
assert trunc_str_values(d, 0) == {'a': u''}
d = {'a': '123456789'}
assert trunc_str_values(d, 1) == {'a': u'1…'}
d = {'a': '123456789'}
assert trunc_str_values(d, 2) == {'a': u'12…'}
d = {'a': '123456789'}
assert trunc_str_values(d, 5) == {'a': u'12345…'}
d = {'a': '123456789'}
assert trunc_str_values(d, 8) == {'a': u'12345678…'}
d = {'a': '123456789'}
assert trunc_str_values(d, 9) == {'a': u'123456789'}
d = {'a': '123456789'}
assert trunc_str_values(d, 10) == d
def test_guichet_is_open(collectivite_1_guichet):
g = collectivite_1_guichet
d = {'a': '123456789', 'b123456789': '987654321'}
assert trunc_str_values(d, 5) == {'a': u'12345…', 'b123456789': u'98765…'}
dt_fmt = '%Y-%m-%d %H:%M'
d = {'a': '123456789', 'b123456789': '987654321', 'c': {'c1':'ABCDEFGHIJK'}}
assert trunc_str_values(d, 5) == {'a': u'12345…', 'b123456789': u'98765…', 'c': {'c1': u'ABCDE…'}}
d_monday = '2019-07-29'
d_sunday = '2019-07-28'
d_saturday = '2019-07-27'
d_friday = '2019-07-26'
d_thursday = '2019-07-25'
d_wednesday = '2019-07-24'
d_tuesday = '2019-07-22'
t_open = '10:44'
t_closed_before = '6:33'
t_closed_after = '20:08'
d = {'a': '123456789', 'b123456789': '987654321', 'c': {'c1':'ABCDEFGHIJK'}, 'd': ['123456789']}
assert trunc_str_values(d, 5) == {'a': u'12345…', 'b123456789': u'98765…', 'c': {'c1': u'ABCDE…'}, 'd': [u'12345…']}
for d in [d_monday, d_tuesday, d_wednesday, d_thursday, d_friday]:
for t in [(t_open, True), (t_closed_before, False), (t_closed_after, False)]:
dt = datetime.datetime.strptime(d + ' ' + t[0], dt_fmt)
assert g.is_open(dt) == t[1]
d = {'a': '123456789', 'b123456789': '987654321', 'c': {'c1':'ABCDEFGHIJK'}, 'd': ['123456789', {'eeeeeeeeee':'132456789'}]}
assert trunc_str_values(d, 5) == {'a': u'12345…', 'b123456789': u'98765…', 'c': {'c1': u'ABCDE…'}, 'd': [u'12345…', {'eeeeeeeeee': u'13245…'}]}
dt = datetime.datetime.strptime(d_friday + ' 16:12', dt_fmt)
assert g.is_open(dt) == False
for d in [d_saturday, d_sunday]:
for t in [t_open, t_closed_before, t_closed_after]:
dt = datetime.datetime.strptime(d + ' ' + t, dt_fmt)
assert g.is_open(dt) == False
with pytest.raises(TypeError) as e:
g.is_open('invalid datetime')
assert unicode(e.value) == u"is_open() expect a datetime object (not a %s)" % type('')
assert g.is_open(None) == False
def test_dict_dumper():
d = {}
dd = DictDumper(d, use_json_dumps=False)
assert d == dd.dic
assert unicode(d) == unicode(dd)
dd = DictDumper(d, 0, use_json_dumps=False)
assert d == dd.dic
assert unicode(d) == unicode(dd)
d = {'a': '123456789'}
dd = DictDumper(d, 10, use_json_dumps=False)
assert d == dd.dic
assert unicode(d) == unicode(dd)
dd = DictDumper(d, 5, use_json_dumps=False)
assert d == dd.dic
assert unicode(dd) == unicode({'a': u'12345…'})
dd = DictDumper(d, 5, use_json_dumps=True)
assert d == dd.dic
assert unicode(dd) == u'{"a": "12345\\u2026"}'
def test_openads_log_json_payload(app, atreal_openads):
def test_openads_log_json_payload(atreal_openads):
# TODO implement
assert True
# change the debug file path
@ -170,7 +284,7 @@ def test_openads_log_json_payload(app, atreal_openads):
# check that what was is logged is correct
def test_openads_get_files_from_json_payload(app, atreal_openads):
def test_openads_get_files_from_json_payload(atreal_openads):
title = 'payload'
assert atreal_openads.get_files_from_json_payload({'files':[{'a':'file'}]}) == [{'a':'file'}]
@ -194,7 +308,7 @@ def test_openads_get_files_from_json_payload(app, atreal_openads):
assert unicode(e.value) == u"Expecting non-empty '%s' value in JSON %s" % ('files', title)
def test_check_file_dict(app, atreal_openads):
def test_check_file_dict(atreal_openads):
title = 'payload'
d = {
@ -235,7 +349,7 @@ def test_check_file_dict(app, atreal_openads):
assert unicode(e.value) == u"Expecting 'file.%s' key in JSON %s" % ('b64_content', title)
def test_get_first_file_from_json_payload(app, atreal_openads):
def test_get_first_file_from_json_payload(atreal_openads):
title = 'payload'
d = {
@ -250,7 +364,7 @@ def test_get_first_file_from_json_payload(app, atreal_openads):
d, title, ensure_content=True, b64=False) == d['files'][0]
def test_openads_check_status(app, atreal_openads):
def test_openads_check_status(atreal_openads):
fake_resp_json = {
'message': 'Service online'
}
@ -261,7 +375,7 @@ def test_openads_check_status(app, atreal_openads):
assert jresp['response'] == 200
def test_openads_create_dossier(app, atreal_openads):
def test_openads_create_dossier(atreal_openads, collectivite_1, collectivite_1_guichet):
fake_req_json = {
"fields": {
@ -272,6 +386,7 @@ def test_openads_create_dossier(app, atreal_openads):
# mandataire
"mandataire_prenom" : "John",
"mandataire_nom" : "Man",
"mandataire_email" : "mandataire_email@domain.example",
"mandataire_qualite" : "Une personne morale",
"mandataire_qualite_raw" : "Une personne morale",
@ -288,6 +403,7 @@ def test_openads_create_dossier(app, atreal_openads):
# petitionnaire
"prenom": "Toto",
"nom" : "Loulou",
"email" : "petitionnaire_email@domain.example",
"qualite" : "Un particulier",
"qualite_raw": "Un particulier",
@ -325,12 +441,12 @@ def test_openads_create_dossier(app, atreal_openads):
"plan_cadastral_2": {
"content" : get_file_data(TEST_FILE_PLAN_CADASTRAL),
"content_type": "application/pdf",
"filename" : os.path.basename(TEST_FILE_PLAN_CADASTRAL)
#"filename" : 'plan_cad'
},
"pouvoir_mandat": {
"content" : get_file_data(TEST_FILE_CERFA_DIA),
"content_type": "application/pdf",
"filename" : 'mandat.pdf'
"filename" : 'mandat'
}
}
}
@ -354,15 +470,50 @@ def test_openads_create_dossier(app, atreal_openads):
with pytest.raises(ValueError) as e:
with mock.patch('passerelle.utils.Request.post') as requests_post:
requests_post.return_value = fake_resp_bad
atreal_openads.create_dossier(req, 'DIA',collectivite='not an integer')
atreal_openads.create_dossier(req, 'DIA', collectivite='not an integer')
assert unicode(e.value) == "invalid literal for int() with base 10: 'not an integer'"
with pytest.raises(APIError) as e:
with mock.patch('passerelle.utils.Request.post') as requests_post:
requests_post.return_value = fake_resp_bad
atreal_openads.create_dossier(req, 'DIA')
atreal_openads.create_dossier(req, 'DIA', collectivite_1.openADS_id)
assert unicode(e.value) == "HTTP error: 502"
# TODO update the code and return message when it will be
# correctly implemented in the openADS.API side.
fake_resp_404 = Response()
fake_resp_404.status_code = 404
fake_resp_404.reason = 'Page not found'
with pytest.raises(APIError) as e:
with mock.patch('passerelle.utils.Request.post') as requests_post:
requests_post.return_value = fake_resp_404
atreal_openads.create_dossier(req, 'DIA', collectivite=999)
assert unicode(e.value) == "HTTP error: 404"
# guichet is open from Monday/8:30 to Friday/12:15, between 9:00 and 17:00
now = datetime.datetime(2019, 8, 10, 16, 0, 0)
jresp = atreal_openads.create_dossier(req, 'DIA', collectivite_1.openADS_id, now=now)
assert jresp is not None
assert len(jresp) == 1
assert 'message' in jresp
assert jresp['message'] == u"Guichet closed for collectivite '%s'" % collectivite_1
now = '2019-08-10 16:00:00'
jresp = atreal_openads.create_dossier(req, 'DIA', collectivite_1.openADS_id, now=now)
assert jresp is not None
assert len(jresp) == 1
assert 'message' in jresp
assert jresp['message'] == u"Guichet closed for collectivite '%s'" % collectivite_1
now = {'invalid': 'type'}
with pytest.raises(APIError) as e:
jresp = atreal_openads.create_dossier(req, 'DIA', collectivite_1.openADS_id, now=now)
assert unicode(e.value) == u"Invalid value of type '%s' for now argument of endpoint '%s' (must be: %s)" % (
type(now),
'create_dossier',
"datetime or string formatted to '%s'" % '%Y-%m-%d %H:%M:%S')
fake_resp_json = {
'numero_dossier' : FAKE_NUMERO_DOSSIER,
'files': [{
@ -380,7 +531,7 @@ def test_openads_create_dossier(app, atreal_openads):
with mock.patch('passerelle.utils.Request.post') as requests_post:
requests_post.return_value = fake_resp
jresp = atreal_openads.create_dossier(req, 'DIA')
jresp = atreal_openads.create_dossier(req, 'DIA', collectivite_1.openADS_id)
assert jresp['numero_dossier'] == fake_resp_json['numero_dossier']
assert jresp['recepisse']['b64_content'] == fake_resp_json['files'][0]['b64_content']
assert jresp['recepisse']['content_type'] == 'application/pdf'
@ -391,7 +542,7 @@ def test_openads_create_dossier(app, atreal_openads):
with pytest.raises(APIError) as e:
with mock.patch('passerelle.utils.Request.post') as requests_post:
requests_post.return_value = fake_resp
atreal_openads.create_dossier(req, 'DIA')
atreal_openads.create_dossier(req, 'DIA', collectivite_1.openADS_id)
assert unicode(e.value) == u"Expecting '%s' value in JSON response to be a %s (not a %s)" % (
'numero_dossier', 'string', type({}))
@ -400,7 +551,7 @@ def test_openads_create_dossier(app, atreal_openads):
with pytest.raises(APIError) as e:
with mock.patch('passerelle.utils.Request.post') as requests_post:
requests_post.return_value = fake_resp
atreal_openads.create_dossier(req, 'DIA')
atreal_openads.create_dossier(req, 'DIA', collectivite_1.openADS_id)
assert unicode(e.value) == u"Expecting 'numero_dossier' key in JSON response"
fake_resp_json['files'][0]['b64_content'] = 'invalid_;{[content}'
@ -408,14 +559,14 @@ def test_openads_create_dossier(app, atreal_openads):
with pytest.raises(APIError) as e:
with mock.patch('passerelle.utils.Request.post') as requests_post:
requests_post.return_value = fake_resp
atreal_openads.create_dossier(req, 'DIA')
atreal_openads.create_dossier(req, 'DIA', collectivite_1.openADS_id)
assert unicode(e.value) == u'Failed to decode recepisse content from base 64'
fake_resp._content = 'df[{gfd;g#vfd'
with pytest.raises(APIError) as e:
with mock.patch('passerelle.utils.Request.post') as requests_post:
requests_post.return_value = fake_resp
atreal_openads.create_dossier(req, 'DIA')
atreal_openads.create_dossier(req, 'DIA', collectivite_1.openADS_id)
assert unicode(e.value) == u'No JSON content returned: %r' % fake_resp._content
job = Job.objects.filter(natural_id=FAKE_NUMERO_DOSSIER).last()
@ -425,7 +576,7 @@ def test_openads_create_dossier(app, atreal_openads):
assert job.method_name == 'upload_user_files'
assert job.natural_id == FAKE_NUMERO_DOSSIER
assert job.parameters is not None
assert len(job.parameters) == 3
assert len(job.parameters) == 4
assert 'file_ids' in job.parameters
assert len(job.parameters['file_ids']) == 4
file_ids = job.parameters['file_ids']
@ -456,7 +607,7 @@ def test_openads_create_dossier(app, atreal_openads):
assert FF.upload_status == 'success'
def test_openads_get_dossier(app, atreal_openads):
def test_openads_get_dossier(atreal_openads):
fake_resp_bad = Response()
fake_resp_bad.status_code = 502
fake_resp_bad.reason = 'Bad gateway'
@ -513,34 +664,7 @@ def test_openads_get_dossier(app, atreal_openads):
assert unicode(e.value) == u"HTTP error: 404, [path] (Invalid Type) \"invalid_type\" is not one of DIA, PC, DP, AT, PD"
def test_openads_upload2ForwardFile(app, atreal_openads):
FF = atreal_openads.upload2ForwardFile(None, None, None)
assert FF is None
FF = atreal_openads.upload2ForwardFile(TEST_FILE_CERFA_DIA, FAKE_NUMERO_DOSSIER, 'cerfa')
assert isinstance(FF, ForwardFile)
assert len(FF.numero_demande) > 0
assert FF.numero_dossier == FAKE_NUMERO_DOSSIER
assert FF.type_fichier == 'cerfa'
assert FF.orig_filename == os.path.basename(TEST_FILE_CERFA_DIA)
assert FF.content_type == 'application/pdf'
assert len(FF.file_hash) > 0
assert isinstance(FF.upload_file, File)
assert FF.upload_status == 'pending'
FF = atreal_openads.upload2ForwardFile(TEST_FILE_PLAN_CADASTRAL, FAKE_NUMERO_DOSSIER, 'plan')
assert isinstance(FF, ForwardFile)
assert len(FF.numero_demande) > 0
assert FF.numero_dossier == FAKE_NUMERO_DOSSIER
assert FF.type_fichier == 'plan'
assert FF.orig_filename == os.path.basename(TEST_FILE_PLAN_CADASTRAL)
assert FF.content_type == 'application/pdf'
assert len(FF.file_hash) > 0
assert isinstance(FF.upload_file, File)
assert FF.upload_status == 'pending'
def test_openads_get_fwd_files(app, atreal_openads):
def test_openads_get_fwd_files(atreal_openads):
with pytest.raises(APIError) as e:
atreal_openads.get_fwd_files(None, FAKE_NUMERO_DOSSIER, fichier_id='not an integer')
assert unicode(e.value) == u"fichier_id must be an integer"
@ -553,7 +677,7 @@ def test_openads_get_fwd_files(app, atreal_openads):
assert resp_empty is not None
assert len(resp_empty) == 0
FF = atreal_openads.upload2ForwardFile(TEST_FILE_CERFA_DIA, FAKE_NUMERO_DOSSIER, 'cerfa')
FF = upload2ForwardFile(atreal_openads, TEST_FILE_CERFA_DIA, FAKE_NUMERO_DOSSIER, 'cerfa')
FF.save()
assert isinstance(FF, ForwardFile)
@ -576,12 +700,12 @@ def test_openads_get_fwd_files(app, atreal_openads):
assert jresp[0]['last_update_datetime'] == FF.last_update_datetime
def test_openads_get_fwd_files_status(app, atreal_openads):
def test_openads_get_fwd_files_status(atreal_openads):
with pytest.raises(Http404) as e:
atreal_openads.get_fwd_files_status(None, FAKE_NUMERO_DOSSIER, fichier_id=18)
assert re.search(r"^No file matches 'numero_dossier=[^']+' and 'id=[^']+'.$", str(e.value))
FF = atreal_openads.upload2ForwardFile(TEST_FILE_CERFA_DIA, FAKE_NUMERO_DOSSIER, 'cerfa')
FF = upload2ForwardFile(atreal_openads, TEST_FILE_CERFA_DIA, FAKE_NUMERO_DOSSIER, 'cerfa')
FF.save()
assert isinstance(FF, ForwardFile)
@ -606,7 +730,7 @@ def test_openads_get_fwd_files_status(app, atreal_openads):
assert len(jresp['failed']) == 0
def test_openads_get_courrier(app, atreal_openads):
def test_openads_get_courrier(atreal_openads):
lettre_type = 'dia_renonciation_preempter'
fake_resp_bad = Response()
@ -655,7 +779,7 @@ def test_openads_get_courrier(app, atreal_openads):
assert unicode(e.value) == u'No JSON content returned: %r' % fake_resp._content
def test_get_response_error(app, atreal_openads):
def test_get_response_error(atreal_openads):
fake_resp_json = {
'errors': [
{
@ -685,16 +809,34 @@ def test_get_response_error(app, atreal_openads):
assert error_msg == u"HTTP error: %s, %s" % (fake_resp.status_code, fake_resp._content)
def test_openads_upload_user_files(app, atreal_openads):
def test_openads_upload_user_files(atreal_openads):
# TODO check logs (because this doesn't do anything but log)
atreal_openads.upload_user_files('DIA', FAKE_NUMERO_DOSSIER, file_ids=[])
req = HttpRequest()
req._body = ''
req.path = '/upload_user_files'
req.method = 'GET'
req.encoding = 'utf-8'
req.GET = QueryDict(mutable=True) # required because of encoding setter
req.POST = QueryDict(mutable=True) # required because of encoding setter
req.content_type = 'application/json'
req.content_params = None
req.COOKIES = {}
req.META = {}
req._read_started = False
with pytest.raises(ForwardFile.DoesNotExist) as e:
atreal_openads.upload_user_files('DIA', FAKE_NUMERO_DOSSIER, file_ids=[999])
assert unicode(e.value) == u"ForwardFile matching query does not exist."
atreal_openads.upload_user_files(req, 'DIA', FAKE_NUMERO_DOSSIER, file_ids=[999])
assert unicode(e.value) == u"The following ForwardFile IDs were not found: %s." % [999]
FF = atreal_openads.upload2ForwardFile(TEST_FILE_CERFA_DIA, FAKE_NUMERO_DOSSIER, 'cerfa')
with pytest.raises(ValueError) as e:
atreal_openads.upload_user_files(req, 'DIA', FAKE_NUMERO_DOSSIER, file_ids='invalid string')
assert unicode(e.value) == u"invalid literal for int() with base 10: '%s'" % 'invalid string'
with pytest.raises(TypeError) as e:
atreal_openads.upload_user_files(req, 'DIA', FAKE_NUMERO_DOSSIER, file_ids={'invalid':'type'})
assert unicode(e.value) == u"Invalid 'file_ids' argument type '%s' (must be string or list)" % type({'invalid':'type'})
FF = upload2ForwardFile(atreal_openads, TEST_FILE_CERFA_DIA, FAKE_NUMERO_DOSSIER, 'cerfa')
FF.save()
assert isinstance(FF, ForwardFile)
assert FF.upload_status == 'pending'
@ -705,9 +847,11 @@ def test_openads_upload_user_files(app, atreal_openads):
fake_resp_bad.status_code = 502
fake_resp_bad.reason = 'Bad gateway'
with mock.patch('passerelle.utils.Request.post') as requests_post:
requests_post.return_value = fake_resp_bad
atreal_openads.upload_user_files('DIA', FAKE_NUMERO_DOSSIER, file_ids=[file_id])
with pytest.raises(APIError) as e:
with mock.patch('passerelle.utils.Request.post') as requests_post:
requests_post.return_value = fake_resp_bad
atreal_openads.upload_user_files(req, 'DIA', FAKE_NUMERO_DOSSIER, file_ids=str(file_id))
assert unicode(e.value) == u'HTTP error: 502'
FFup = ForwardFile.objects.get(id=file_id)
assert isinstance(FFup, ForwardFile)
@ -717,6 +861,9 @@ def test_openads_upload_user_files(app, atreal_openads):
assert FFup.upload_status == 'failed'
assert FFup.upload_msg == "HTTP error: 502"
FFup.upload_status = 'pending'
FFup.save()
fake_resp = Response()
fake_resp.status_code = 200
fake_resp.headers = {'Content-Type': 'application/json'}
@ -724,9 +871,11 @@ def test_openads_upload_user_files(app, atreal_openads):
fake_resp.reason = 'OK'
fake_resp._content = 'invalid_;{[content}'
with mock.patch('passerelle.utils.Request.post') as requests_post:
requests_post.return_value = fake_resp
atreal_openads.upload_user_files('DIA', FAKE_NUMERO_DOSSIER, file_ids=[file_id])
with pytest.raises(APIError) as e:
with mock.patch('passerelle.utils.Request.post') as requests_post:
requests_post.return_value = fake_resp
atreal_openads.upload_user_files(req, 'DIA', FAKE_NUMERO_DOSSIER, file_ids=[file_id])
assert unicode(e.value) == u'No JSON content returned: %r' % fake_resp.content
FFup = ForwardFile.objects.get(id=file_id)
assert isinstance(FFup, ForwardFile)
@ -736,11 +885,19 @@ def test_openads_upload_user_files(app, atreal_openads):
assert FFup.upload_status == 'failed'
assert FFup.upload_msg == u'No JSON content returned: %r' % fake_resp._content
jresp = atreal_openads.upload_user_files(req, 'DIA', FAKE_NUMERO_DOSSIER)
assert jresp == {'message': 'no file to transfer'}
FFup = ForwardFile.objects.get(id=file_id)
FFup.upload_status = 'pending'
FFup.save()
fake_resp_json = "You want add some files on %s " % FAKE_NUMERO_DOSSIER
fake_resp._content = json.dumps(fake_resp_json)
with mock.patch('passerelle.utils.Request.post') as requests_post:
requests_post.return_value = fake_resp
atreal_openads.upload_user_files('DIA', FAKE_NUMERO_DOSSIER, file_ids=[file_id])
jresp = atreal_openads.upload_user_files(req, 'DIA', FAKE_NUMERO_DOSSIER)
assert jresp == {'message': 'all files transfered successfully'}
FFup = ForwardFile.objects.get(id=file_id)
assert isinstance(FFup, ForwardFile)

130
tests/test_forms.py Normal file
View File

@ -0,0 +1,130 @@
# -*- coding: utf-8 -*-
import pytest
import os
import base64
import datetime
from django.core.files import File
from atreal_openads.forms import (
ForwardFileForm,
CollectiviteForm,
GuichetForm
)
from atreal_openads.models import (
ForwardFile,
Guichet,
Collectivite,
AtrealOpenads
)
CONNECTOR_NAME = 'atreal-openads'
CONNECTOR_SLUG = 'atreal'
COLLECTIVITE = 79
OPENADS_API_LOGIN = 'publik-passerelle'
OPENADS_API_PASSWORD = base64.urlsafe_b64encode(os.urandom(20))
OPENADS_API_URL = 'http://openads.api/'
FAKE_COOKIE_CRSF = base64.urlsafe_b64encode(os.urandom(20))
FAKE_NUMERO_DOSSIER = base64.urlsafe_b64encode(os.urandom(10))
TESTS_DIR = os.path.dirname(__file__)
RESOURCES_DIR = os.path.join(TESTS_DIR, 'resources')
TEST_FILE_CERFA_DIA = os.path.join(RESOURCES_DIR, 'cerfa_10072-02.pdf')
TEST_FILE_PLAN_CADASTRAL = os.path.join(RESOURCES_DIR, 'plancadastral.pdf')
@pytest.fixture
def atreal_openads(db):
return AtrealOpenads.objects.create(
slug = CONNECTOR_SLUG,
default_collectivite_openADS_id = COLLECTIVITE,
openADS_API_url = OPENADS_API_URL,
basic_auth_username = OPENADS_API_LOGIN,
basic_auth_password = OPENADS_API_PASSWORD
)
@pytest.fixture
def collectivite_1(db, atreal_openads):
return Collectivite.objects.create(
name = u'Macollectivité',
connecteur = atreal_openads,
openADS_id = '3'
)
@pytest.fixture
def collectivite_1_guichet(db, atreal_openads, collectivite_1):
return Guichet.objects.create(
collectivite = collectivite_1,
ouverture_jour_h = datetime.time(9, 0),
fermeture_jour_h = datetime.time(17, 0),
ouverture_sem_d = 1, # Lundi
fermeture_sem_d = 5, # Vendredi
ouverture_sem_h = datetime.time(8, 30),
fermeture_sem_h = datetime.time(12, 15)
)
def test_forwardfile_form(atreal_openads, collectivite_1):
form = ForwardFileForm()
assert form.instance is not None
ff = ForwardFile(
connecteur = None,
collectivite = None,
numero_demande = '45641531',
numero_dossier = FAKE_NUMERO_DOSSIER,
type_fichier = 'CERFA',
orig_filename = os.path.basename(TEST_FILE_CERFA_DIA),
content_type = 'application/pdf',
file_hash = 'ffdf456fdsvgb4bgfb6g4f5b',
upload_file = File(open(TEST_FILE_CERFA_DIA, 'r')),
upload_status = 'pending'
)
form_with_instance = ForwardFileForm(instance=ff, collectivite=collectivite_1)
assert form_with_instance.instance is ff
assert form_with_instance.instance.collectivite is collectivite_1
form_with_instance = ForwardFileForm(instance=ff, connecteur=atreal_openads)
assert form_with_instance.instance is ff
assert form_with_instance.instance.connecteur is atreal_openads
# TODO check the queryset of the collectivite
def test_collectivite_form(atreal_openads):
form = CollectiviteForm()
assert form.instance is not None
col = Collectivite(
connecteur = None,
name = u'Ma collectivité',
openADS_id = 3
)
form_with_instance = CollectiviteForm(instance=col, connecteur=atreal_openads)
assert form_with_instance.instance is col
assert form_with_instance.instance.connecteur is atreal_openads
def test_guichet_form(atreal_openads, collectivite_1):
form = GuichetForm()
assert form.instance is not None
gui = Guichet(
collectivite = None,
ouverture_jour_h = datetime.time(9, 0, 0),
fermeture_jour_h = datetime.time(18, 0, 0),
ouverture_sem_d = 1,
fermeture_sem_d = 5,
ouverture_sem_h = datetime.time(10, 30, 0),
fermeture_sem_h = datetime.time(12, 15, 0)
)
form_with_instance = GuichetForm(instance=gui, collectivite=collectivite_1)
assert form_with_instance.instance is gui
assert form_with_instance.instance.collectivite is collectivite_1

288
tests/test_utils.py Normal file
View File

@ -0,0 +1,288 @@
# -*- coding: utf-8 -*-
# to run it use the following command in the 'tests' directory:
# ~> DJANGO_SETTINGS_MODULE=passerelle.settings pytest -W ignore::django.utils.deprecation.RemovedInDjango20Warning test_atreal_openads.py -vv
#
# and with 'coverage':
# ~> DJANGO_SETTINGS_MODULE=passerelle.settings pytest -W ignore::django.utils.deprecation.RemovedInDjango20Warning test_atreal_openads.py -vv --cov=~/src/passerelle/passerelle/apps/atreal_openads
import pytest
import os
import base64
import re
import datetime
from django.core.files import File
from django.core.files.base import ContentFile
from atreal_openads.utils import (
to_dash_case,
force_encoded_string_output,
strip_tags,
clean_spaces,
normalize,
get_file_data,
get_file_digest,
get_upload_path,
get_file_extension,
trunc_str_values,
DictDumper
)
from atreal_openads.models import (
ForwardFile,
Guichet,
Collectivite,
AtrealOpenads
)
CONNECTOR_NAME = 'atreal-openads'
CONNECTOR_SLUG = 'atreal'
COLLECTIVITE = 79
OPENADS_API_LOGIN = 'publik-passerelle'
OPENADS_API_PASSWORD = base64.urlsafe_b64encode(os.urandom(20))
OPENADS_API_URL = 'http://openads.api/'
FAKE_COOKIE_CRSF = base64.urlsafe_b64encode(os.urandom(20))
FAKE_NUMERO_DOSSIER = base64.urlsafe_b64encode(os.urandom(10))
TESTS_DIR = os.path.dirname(__file__)
RESOURCES_DIR = os.path.join(TESTS_DIR, 'resources')
TEST_FILE_CERFA_DIA = os.path.join(RESOURCES_DIR, 'cerfa_10072-02.pdf')
TEST_FILE_PLAN_CADASTRAL = os.path.join(RESOURCES_DIR, 'plancadastral.pdf')
@pytest.fixture
def atreal_openads(db):
return AtrealOpenads.objects.create(
slug = CONNECTOR_SLUG,
default_collectivite_openADS_id = COLLECTIVITE,
openADS_API_url = OPENADS_API_URL,
basic_auth_username = OPENADS_API_LOGIN,
basic_auth_password = OPENADS_API_PASSWORD
)
@pytest.fixture
def collectivite_1(db, atreal_openads):
return Collectivite.objects.create(
name = u'Macollectivité',
connecteur = atreal_openads,
openADS_id = '3'
)
@pytest.fixture
def collectivite_1_guichet(db, atreal_openads, collectivite_1):
return Guichet.objects.create(
collectivite = collectivite_1,
ouverture_jour_h = datetime.time(9, 0),
fermeture_jour_h = datetime.time(17, 0),
ouverture_sem_d = 1, # Lundi
fermeture_sem_d = 5, # Vendredi
ouverture_sem_h = datetime.time(8, 30),
fermeture_sem_h = datetime.time(12, 15)
)
def test_to_dash_case():
s = 'ACamelCaseName'
assert to_dash_case(s) == 'a-camel-case-name'
assert to_dash_case('') == ''
def test_force_encoded_string_output():
def a_str_function():
return str('toto')
ret = force_encoded_string_output(a_str_function)()
assert isinstance(ret, str)
ret = force_encoded_string_output(a_str_function, 'latin1')()
assert isinstance(ret, str)
def an_unicode_function():
return u'toto'
ret = force_encoded_string_output(an_unicode_function)()
assert isinstance(ret, str)
ret = force_encoded_string_output(an_unicode_function, 'latin1')()
assert isinstance(ret, str)
def test_strip_tags():
s = 'aaa b cc '
assert strip_tags(s) == s
ss = s + '<em>dd'
assert strip_tags(ss) == s + 'dd'
ss = s + '<em>dd</em>'
assert strip_tags(ss) == s + 'dd'
ss = s + '<em>dd</em>'
assert strip_tags(ss) == s + 'dd'
ss = s + ' 1 < 3'
assert strip_tags(ss) == s + ' 1 < 3'
def test_clean_spaces():
s = 'aaa b cc '
assert clean_spaces(s) == 'aaa b cc'
s = 'a\ta b\nb c\rc d\\n\\r\\td'
assert clean_spaces(s) == 'a a b b c c d d'
def test_normalize():
assert normalize(None) == ''
s = 'aaa b cc '
assert normalize(s) == 'aaa b cc'
s = 'a\ta b\nb c\rc d\\n\\r\\td'
assert normalize(s) == 'a a b b c c d d'
def test_get_file_data():
assert get_file_data(TEST_FILE_CERFA_DIA) == base64.b64encode(open(TEST_FILE_CERFA_DIA).read())
assert get_file_data(TEST_FILE_CERFA_DIA, b64=False) == open(TEST_FILE_CERFA_DIA).read()
def test_get_file_digest():
with open(TEST_FILE_CERFA_DIA) as fd:
assert get_file_digest(fd) == 'cc90a620982760fdee16a5b4fe1b5ac3b4fe868fd02d2f70b27f1e46d283ea51'
def test_get_upload_path():
ff = ForwardFile(
numero_demande='45641531',
numero_dossier=FAKE_NUMERO_DOSSIER,
type_fichier='CERFA',
orig_filename=os.path.basename(TEST_FILE_CERFA_DIA),
content_type='application/pdf',
file_hash='ffdf456fdsvgb4bgfb6g4f5b',
upload_file=File(open(TEST_FILE_CERFA_DIA, 'r')),
upload_status='pending',
connecteur=None,
collectivite=None
)
regex = r"^to_openADS__%s__%s\.pdf$" % (
'[0-9]{4}-[0-9]{2}-[0-9]{2}_[0-9]{2}h[0-9]{2}m[0-9]{2}s[0-9]+', 'ffdf')
assert re.search(regex, get_upload_path(ff))
def test_get_file_extension():
assert get_file_extension('afile.pdf') == '.pdf'
assert get_file_extension('afile', 'application/pdf') == '.pdf'
assert get_file_extension('') == ''
assert get_file_extension('afile') == ''
def test_trunc_str_values():
d = {}
assert trunc_str_values(d, 10) == d
d = {'a': '123456789'}
assert trunc_str_values(d, 0) == {'a': u''}
d = {'a': '123456789'}
assert trunc_str_values(d, 1) == {'a': u'1…'}
d = {'a': '123456789'}
assert trunc_str_values(d, 2) == {'a': u'12…'}
d = {'a': '123456789'}
assert trunc_str_values(d, 5) == {'a': u'12345…'}
d = {'a': '123456789'}
assert trunc_str_values(d, 8) == {'a': u'12345678…'}
d = {'a': '123456789'}
assert trunc_str_values(d, 9) == {'a': u'123456789'}
d = {'a': '123456789'}
assert trunc_str_values(d, 10) == d
d = {'a': '123456789', 'b123456789': '987654321'}
assert trunc_str_values(d, 5) == {'a': u'12345…', 'b123456789': u'98765…'}
d = {'a': '123456789', 'b123456789': '987654321', 'c': {'c1':'ABCDEFGHIJK'}}
assert trunc_str_values(d, 5) == {'a': u'12345…', 'b123456789': u'98765…', 'c': {'c1': u'ABCDE…'}}
d = {'a': '123456789', 'b123456789': '987654321', 'c': {'c1':'ABCDEFGHIJK'}, 'd': ['123456789']}
assert trunc_str_values(d, 5) == {'a': u'12345…', 'b123456789': u'98765…', 'c': {'c1': u'ABCDE…'}, 'd': [u'12345…']}
d = {'a': '123456789', 'b123456789': '987654321', 'c': {'c1':'ABCDEFGHIJK'}, 'd': ['123456789', {'eeeeeeeeee':'132456789'}]}
assert trunc_str_values(d, 5) == {'a': u'12345…', 'b123456789': u'98765…', 'c': {'c1': u'ABCDE…'}, 'd': [u'12345…', {'eeeeeeeeee': u'13245…'}]}
def test_dict_dumper():
d = {}
dd = DictDumper(d, use_json_dumps=False)
assert repr(dd) == (u'DictDumper(dic=%r,max_str_len=%r,use_json_dumps=%r)' % (
d, dd.max_str_len, dd.use_json_dumps)).encode('utf-8')
assert str(dd) == '{}'
assert unicode(dd) == u'{}'
assert d == dd.dic
assert unicode(d) == unicode(dd)
dd = DictDumper(d, 0, use_json_dumps=False)
assert d == dd.dic
assert unicode(d) == unicode(dd)
d = {'a': '123456789'}
dd = DictDumper(d, 10, use_json_dumps=False)
assert d == dd.dic
assert unicode(d) == unicode(dd)
dd = DictDumper(d, 5, use_json_dumps=False)
assert d == dd.dic
assert unicode(dd) == unicode({'a': u'12345…'})
dd = DictDumper(d, 5, use_json_dumps=True)
assert d == dd.dic
assert unicode(dd) == u'{"a": "12345\\u2026"}'
def test_base_model(atreal_openads, collectivite_1, collectivite_1_guichet):
ff = ForwardFile(
numero_demande='45641531',
numero_dossier=FAKE_NUMERO_DOSSIER,
type_fichier='CERFA',
orig_filename=os.path.basename(TEST_FILE_CERFA_DIA),
content_type='application/pdf',
file_hash='ffdf456fdsvgb4bgfb6g4f5b',
upload_file=ContentFile('toto'),
upload_status='pending',
connecteur=atreal_openads,
collectivite=None
)
assert ff.get_verbose_name() == 'Forward File'
assert ff.get_verbose_name_plural() == 'Forward Files'
assert ff.get_class_name() == 'ForwardFile'
assert ff.get_class_name_plural() == 'ForwardFiles'
assert ff.get_class_name_dash_case() == 'forward-file'
assert ff.get_class_name_plural_dash_case() == 'forward-files'
assert ff.get_class_name_title() == 'Forward File'
assert ff.get_class_name_plural_title() == 'Forward Files'
assert ff.get_url_name('list', plural=True) == 'list-forward-files'
assert ff.get_absolute_url() == '/manage/atreal-openads/atreal/forward-file/None'
assert ff.get_edit_url() == '/manage/atreal-openads/atreal/edit-forward-file/None'
assert ff.get_delete_url() == '/manage/atreal-openads/atreal/delete-forward-file/None'
assert ff.get_list_url() == '/manage/atreal-openads/atreal/forward-files'
assert atreal_openads.get_class_name_plural() == 'AtrealOpenads'
assert atreal_openads.get_url_name('view') == 'view-connector'
params = atreal_openads.get_url_params(True)
assert params['connector'] == 'atreal-openads'
assert params['slug'] == atreal_openads.slug
with pytest.raises(Exception) as e:
atreal_openads.get_list_url()
assert unicode(e.value) == u"AtrealOpenads:get_list_url() method should not be called"
# TODO add more collectivite test cases
with pytest.raises(Exception) as e:
collectivite_1_guichet.get_list_url()
assert unicode(e.value) == u"Guichet:get_list_url() method should not be called"

413
tests/test_views.py Normal file
View File

@ -0,0 +1,413 @@
# -*- coding: utf-8 -*-
import pytest
import os
import base64
import datetime
from django.http.request import HttpRequest, QueryDict
from django.urls.base import resolve
from django.core.files import File
from atreal_openads.views import (
get_connecteur_from_request,
get_collectivite_from_request,
AtrealOpenadsView,
ForwardFileView,
ForwardFileListView,
ForwardFileUpdateView,
ForwardFileDeleteView,
CollectiviteView,
CollectiviteListView,
CollectiviteCreateView,
CollectiviteUpdateView,
CollectiviteDeleteView,
GuichetView,
GuichetCreateView,
GuichetUpdateView,
GuichetDeleteView
)
from atreal_openads.models import (
ForwardFile,
Guichet,
Collectivite,
AtrealOpenads
)
CONNECTOR_NAME = 'atreal-openads'
CONNECTOR_SLUG = 'atreal'
COLLECTIVITE = 79
OPENADS_API_LOGIN = 'publik-passerelle'
OPENADS_API_PASSWORD = base64.urlsafe_b64encode(os.urandom(20))
OPENADS_API_URL = 'http://openads.api/'
FAKE_COOKIE_CRSF = base64.urlsafe_b64encode(os.urandom(20))
FAKE_NUMERO_DOSSIER = base64.urlsafe_b64encode(os.urandom(10))
TESTS_DIR = os.path.dirname(__file__)
RESOURCES_DIR = os.path.join(TESTS_DIR, 'resources')
TEST_FILE_CERFA_DIA = os.path.join(RESOURCES_DIR, 'cerfa_10072-02.pdf')
TEST_FILE_PLAN_CADASTRAL = os.path.join(RESOURCES_DIR, 'plancadastral.pdf')
@pytest.fixture
def atreal_openads(db):
return AtrealOpenads.objects.create(
slug = CONNECTOR_SLUG,
default_collectivite_openADS_id = COLLECTIVITE,
openADS_API_url = OPENADS_API_URL,
basic_auth_username = OPENADS_API_LOGIN,
basic_auth_password = OPENADS_API_PASSWORD
)
@pytest.fixture
def collectivite_1(db, atreal_openads):
return Collectivite.objects.create(
name = u'Macollectivité',
connecteur = atreal_openads,
openADS_id = '3'
)
@pytest.fixture
def collectivite_1_guichet(db, atreal_openads, collectivite_1):
return Guichet.objects.create(
collectivite = collectivite_1,
ouverture_jour_h = datetime.time(9, 0),
fermeture_jour_h = datetime.time(17, 0),
ouverture_sem_d = 1, # Lundi
fermeture_sem_d = 5, # Vendredi
ouverture_sem_h = datetime.time(8, 30),
fermeture_sem_h = datetime.time(12, 15)
)
@pytest.fixture
def forwardfile_1(db, atreal_openads, collectivite_1):
return ForwardFile.objects.create(
connecteur = atreal_openads,
collectivite = collectivite_1,
numero_demande = '45641531',
numero_dossier = FAKE_NUMERO_DOSSIER,
type_fichier = 'CERFA',
orig_filename = os.path.basename(TEST_FILE_CERFA_DIA),
content_type = 'application/pdf',
file_hash = 'ffdf456fdsvgb4bgfb6g4f5b',
upload_file = File(open(TEST_FILE_CERFA_DIA, 'r')),
upload_status = 'pending'
)
def test_get_connecteur_from_request(atreal_openads, forwardfile_1):
req = HttpRequest()
req.path = '/manage/atreal-openads/%s/forward-file/%s' % (
atreal_openads.slug, forwardfile_1.id)
req.method = 'GET'
req.encoding = 'utf-8'
req.GET = QueryDict(mutable=True) # required because of encoding setter
req.POST = QueryDict(mutable=True) # required because of encoding setter
req.content_params = None
req.COOKIES = {}
req.META = {}
req._read_started = False
req.resolver_match = resolve(req.path)
view = ForwardFileView()
view.request = req
connecteur = get_connecteur_from_request(view)
assert connecteur is not None
assert connecteur.slug == atreal_openads.slug
def test_get_collectivite_from_request(atreal_openads, collectivite_1):
req = HttpRequest()
req.path = '/manage/atreal-openads/%s/collectivite/%s/forward-files' % (
atreal_openads.slug, collectivite_1.id)
req.method = 'GET'
req.encoding = 'utf-8'
req.GET = QueryDict(mutable=True) # required because of encoding setter
req.POST = QueryDict(mutable=True) # required because of encoding setter
req.content_params = None
req.COOKIES = {}
req.META = {}
req._read_started = False
req.resolver_match = resolve(req.path)
view = ForwardFileListView()
view.request = req
collectivite = get_collectivite_from_request(view)
assert collectivite is not None
assert collectivite.id == collectivite_1.id
def test_forwardfile_view(atreal_openads, collectivite_1, forwardfile_1):
req = HttpRequest()
req.path = '/manage/atreal-openads/%s/forward-file/%s' % (
atreal_openads.slug, forwardfile_1.id)
req.method = 'GET'
req.encoding = 'utf-8'
req.GET = QueryDict(mutable=True) # required because of encoding setter
req.POST = QueryDict(mutable=True) # required because of encoding setter
req.content_params = None
req.COOKIES = {}
req.META = {}
req._read_started = False
req.resolver_match = resolve(req.path)
view = ForwardFileView()
view.request = req
view.object = None
view.kwargs = req.resolver_match.kwargs
context = view.get_context_data()
assert context['connecteur'].slug == atreal_openads.slug
view = ForwardFileUpdateView()
view.request = req
view.object = None
view.kwargs = req.resolver_match.kwargs
context = view.get_context_data()
assert context['connecteur'].slug == atreal_openads.slug
url = view.get_success_url()
assert url == u'/manage/atreal-openads/%s/forward-file/%s' % (
atreal_openads.slug, forwardfile_1.id)
req.GET['back-to'] = 'list-forward-files'
url = view.get_success_url()
assert url == u'/manage/atreal-openads/%s/forward-files' % atreal_openads.slug
req.GET['back-to'] = 'col-list-forward-files'
url = view.get_success_url()
assert url == u'/manage/atreal-openads/%s/collectivite/%s/forward-files' % (
atreal_openads.slug, collectivite_1.id)
view = ForwardFileDeleteView()
view.request = req
view.object = None
view.kwargs = req.resolver_match.kwargs
context = view.get_context_data()
assert context['connecteur'].slug == atreal_openads.slug
del(req.GET['back-to'])
url = view.get_success_url()
assert url == u'/atreal-openads/%s/' % atreal_openads.slug
req.GET['back-to'] = 'list-forward-files'
url = view.get_success_url()
assert url == u'/manage/atreal-openads/%s/forward-files' % atreal_openads.slug
req.GET['back-to'] = 'col-list-forward-files'
url = view.get_success_url()
assert url == u'/manage/atreal-openads/%s/collectivite/%s/forward-files' % (
atreal_openads.slug, collectivite_1.id)
req.path = '/manage/atreal-openads/%s/collectivite/%s/forward-files' % (
atreal_openads.slug, collectivite_1.id)
req.resolver_match = resolve(req.path)
view = ForwardFileListView()
view.request = req
view.object_list = []
view.kwargs = req.resolver_match.kwargs
context = view.get_context_data()
assert context['connecteur'].slug == atreal_openads.slug
assert context['collectivite'].id == collectivite_1.id
qs = view.get_queryset()
assert qs.query is not None
assert qs.query.order_by == ['id']
assert qs.query.default_ordering == True
assert qs.query.get_meta().ordering == ['-last_update_datetime']
assert qs.ordered
req.GET['order-by'] = '-id'
qs = view.get_queryset()
assert qs.query is not None
assert qs.query.order_by == ['-id']
assert qs.query.default_ordering == True
req.path = '/manage/atreal-openads/%s/forward-files' % atreal_openads.slug
req.resolver_match = resolve(req.path)
del(req.GET['back-to'])
del(req.GET['order-by'])
view = ForwardFileListView()
view.request = req
view.object_list = []
view.kwargs = req.resolver_match.kwargs
context = view.get_context_data()
assert context['connecteur'].slug == atreal_openads.slug
qs = view.get_queryset()
assert qs.query is not None
assert qs.query.order_by == ['id']
assert qs.query.default_ordering == True
assert qs.query.get_meta().ordering == ['-last_update_datetime']
assert qs.ordered
def test_collectivite_view(atreal_openads, collectivite_1, forwardfile_1):
req = HttpRequest()
req.path = '/manage/atreal-openads/%s/collectivite/%s' % (
atreal_openads.slug, collectivite_1.id)
req.method = 'GET'
req.encoding = 'utf-8'
req.GET = QueryDict(mutable=True) # required because of encoding setter
req.POST = QueryDict(mutable=True) # required because of encoding setter
req.content_params = None
req.COOKIES = {}
req.META = {}
req._read_started = False
req.resolver_match = resolve(req.path)
view = CollectiviteView()
view.request = req
view.object = None
view.kwargs = req.resolver_match.kwargs
context = view.get_context_data()
assert context['connecteur'].slug == atreal_openads.slug
assert context['guichet_add_url'] == u'/manage/atreal-openads/%s/collectivite/%s/create-guichet' % (
atreal_openads.slug, collectivite_1.id)
assert context['forward_files_list_url'] == u'/manage/atreal-openads/%s/collectivite/%s/forward-files' % (
atreal_openads.slug, collectivite_1.id)
view = CollectiviteUpdateView()
view.request = req
view.object = None
view.kwargs = req.resolver_match.kwargs
context = view.get_context_data()
assert context['connecteur'].slug == atreal_openads.slug
url = view.get_success_url()
assert url == u'/manage/atreal-openads/%s/collectivite/%s' % (
atreal_openads.slug, collectivite_1.id)
req.GET['back-to'] = 'list-collectivites'
url = view.get_success_url()
assert url == u'/manage/atreal-openads/%s/collectivites' % atreal_openads.slug
view = CollectiviteDeleteView()
view.request = req
view.object = None
view.kwargs = req.resolver_match.kwargs
context = view.get_context_data()
assert context['connecteur'].slug == atreal_openads.slug
del(req.GET['back-to'])
url = view.get_success_url()
assert url == u'/atreal-openads/%s/' % atreal_openads.slug
req.GET['back-to'] = 'list-collectivites'
url = view.get_success_url()
assert url == u'/manage/atreal-openads/%s/collectivites' % atreal_openads.slug
view = CollectiviteCreateView()
req.path = '/manage/atreal-openads/%s/create-collectivite' % atreal_openads.slug
req.resolver_match = resolve(req.path)
view.request = req
view.object = None
view.kwargs = req.resolver_match.kwargs
context = view.get_context_data()
assert context['connecteur'].slug == atreal_openads.slug
kwargs = view.get_form_kwargs()
assert kwargs['connecteur'].slug == atreal_openads.slug
del(req.GET['back-to'])
url = view.get_success_url()
assert url == u'/atreal-openads/%s/' % atreal_openads.slug
req.GET['back-to'] = 'list-collectivites'
url = view.get_success_url()
assert url == u'/manage/atreal-openads/%s/collectivites' % atreal_openads.slug
req.path = '/manage/atreal-openads/%s/collectivites' % atreal_openads.slug
req.resolver_match = resolve(req.path)
view = CollectiviteListView()
view.request = req
view.object_list = []
view.kwargs = req.resolver_match.kwargs
context = view.get_context_data()
assert context['connecteur'].slug == atreal_openads.slug
assert context['collectivite_add_url'] == u'/manage/atreal-openads/%s/create-collectivite' % atreal_openads.slug
qs = view.get_queryset()
assert qs.query is not None
assert qs.query.order_by == ['id']
assert qs.query.default_ordering == True
assert qs.query.get_meta().ordering == ['name']
assert qs.ordered
req.GET['order-by'] = '-id'
qs = view.get_queryset()
assert qs.query is not None
assert qs.query.order_by == ['-id']
assert qs.query.default_ordering == True
def test_guichet_view(atreal_openads, collectivite_1, collectivite_1_guichet):
req = HttpRequest()
req.path = '/manage/atreal-openads/%s/collectivite/%s/guichet/%s' % (
atreal_openads.slug, collectivite_1.id, collectivite_1_guichet.id)
req.method = 'GET'
req.encoding = 'utf-8'
req.GET = QueryDict(mutable=True) # required because of encoding setter
req.POST = QueryDict(mutable=True) # required because of encoding setter
req.content_params = None
req.COOKIES = {}
req.META = {}
req._read_started = False
req.resolver_match = resolve(req.path)
view = GuichetView()
view.request = req
view.object = None
view.kwargs = req.resolver_match.kwargs
context = view.get_context_data()
assert context['connecteur'].slug == atreal_openads.slug
assert context['collectivite'].id == collectivite_1.id
view = GuichetUpdateView()
view.request = req
view.object = None
view.kwargs = req.resolver_match.kwargs
context = view.get_context_data()
assert context['connecteur'].slug == atreal_openads.slug
assert context['collectivite'].id == collectivite_1.id
view = GuichetDeleteView()
view.request = req
view.object = None
view.kwargs = req.resolver_match.kwargs
context = view.get_context_data()
assert context['connecteur'].slug == atreal_openads.slug
assert context['collectivite'].id == collectivite_1.id
url = view.get_success_url()
assert url == u'/manage/atreal-openads/%s/collectivite/%s' % (
atreal_openads.slug, collectivite_1.id)
view = GuichetCreateView()
req.path = '/manage/atreal-openads/%s/collectivite/%s/create-guichet' % (
atreal_openads.slug, collectivite_1.id)
req.resolver_match = resolve(req.path)
view.request = req
view.object = None
view.kwargs = req.resolver_match.kwargs
context = view.get_context_data()
assert context['connecteur'].slug == atreal_openads.slug
assert context['collectivite'].id == collectivite_1.id
kwargs = view.get_form_kwargs()
assert kwargs['collectivite'].id == collectivite_1.id
url = view.get_success_url()
assert url == u'/manage/atreal-openads/%s/collectivite/%s' % (
atreal_openads.slug, collectivite_1.id)
def test_connecteur_view(atreal_openads):
req = HttpRequest()
req.path = '/atreal-openads/%s/' % atreal_openads.slug
req.method = 'GET'
req.encoding = 'utf-8'
req.GET = QueryDict(mutable=True) # required because of encoding setter
req.POST = QueryDict(mutable=True) # required because of encoding setter
req.content_params = None
req.COOKIES = {}
req.META = {}
req._read_started = False
req.resolver_match = resolve(req.path)
view = AtrealOpenadsView()
view.request = req
view.object = atreal_openads
view.kwargs = req.resolver_match.kwargs
context = view.get_context_data()
assert context['collectivite_fields'] == Collectivite.get_fields()
assert context['collectivite_add_url'] == u'/manage/atreal-openads/%s/create-collectivite' % atreal_openads.slug