diff --git a/atreal_openads/forms.py b/atreal_openads/forms.py new file mode 100644 index 0000000..4c7a398 --- /dev/null +++ b/atreal_openads/forms.py @@ -0,0 +1,67 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +from django.forms import ModelForm + +from .models import ForwardFile, Collectivite, Guichet + +class ForwardFileForm(ModelForm): + class Meta: + model = ForwardFile + exclude = ['connecteur', 'size', 'file_hash'] + + def __init__(self, *args, **kwargs): + + connecteur = kwargs.pop('connecteur' , None) + collectivite = kwargs.pop('collectivite', None) + + super(ForwardFileForm, self).__init__(*args, **kwargs) + + if ( + (not hasattr(self.instance, 'connecteur') or not self.instance.connecteur) + and connecteur + ): + self.instance.connecteur = connecteur + if ( + (not hasattr(self.instance, 'collectivite') or not self.instance.collectivite) + and collectivite + ): + self.instance.collectivite = collectivite + + # only allow to select a 'collectivite' that belongs to the connecteur + if hasattr(self.instance, 'connecteur') and self.instance.connecteur: + self.fields['collectivite'].queryset = Collectivite.objects.filter(connecteur=self.instance.connecteur) + + # TODO if the status is 'uploading' make everything read-only + + +class CollectiviteForm(ModelForm): + + class Meta: + model = Collectivite + exclude = ['connecteur'] + + def __init__(self, *args, **kwargs): + connecteur = kwargs.pop('connecteur', None) + super(CollectiviteForm, self).__init__(*args, **kwargs) + if ( + (not hasattr(self.instance, 'connecteur') or not self.instance.connecteur) + and connecteur + ): + self.instance.connecteur = connecteur + + +class GuichetForm(ModelForm): + + class Meta: + model = Guichet + exclude = ['collectivite'] + + def __init__(self, *args, **kwargs): + collectivite = kwargs.pop('collectivite', None) + super(GuichetForm, self).__init__(*args, **kwargs) + if ( + (not hasattr(self.instance, 'collectivite') or not self.instance.collectivite) + and collectivite + ): + self.instance.collectivite = collectivite diff --git a/atreal_openads/migrations/0001_initial.py b/atreal_openads/migrations/0001_initial.py index e011449..8ff24ad 100644 --- a/atreal_openads/migrations/0001_initial.py +++ b/atreal_openads/migrations/0001_initial.py @@ -1,9 +1,10 @@ # -*- coding: utf-8 -*- -# Generated by Django 1.11.15 on 2019-07-18 20:51 +# Generated by Django 1.11.18 on 2019-08-20 15:02 from __future__ import unicode_literals -import atreal_openads.models +import atreal_openads.utils from django.db import migrations, models +import django.db.models.deletion class Migration(migrations.Migration): @@ -28,13 +29,30 @@ class Migration(migrations.Migration): ('trusted_certificate_authorities', models.FileField(blank=True, null=True, upload_to=b'', verbose_name='TLS trusted CAs')), ('verify_cert', models.BooleanField(default=True, verbose_name='TLS verify certificates')), ('http_proxy', models.CharField(blank=True, max_length=128, verbose_name='HTTP and HTTPS proxy')), - ('collectivite', models.CharField(blank=True, default=b'', help_text='ex: Marseille, or ex: 3', max_length=255, verbose_name='Collectivity (identifier)')), + ('default_collectivite_openADS_id', models.PositiveIntegerField(blank=True, default=0, help_text='ex: 3', verbose_name="Default 'collectivite' (identifier in openADS)")), ('openADS_API_url', models.URLField(default=b'', help_text='ex: https://openads.your_domain.net/api/', max_length=255, verbose_name='openADS API URL')), ('users', models.ManyToManyField(blank=True, related_name='_atrealopenads_users_+', related_query_name='+', to='base.ApiUser')), ], options={ + 'ordering': ['openADS_API_url'], 'verbose_name': 'openADS', + 'verbose_name_plural': 'openADS', }, + bases=(models.Model, atreal_openads.utils.BaseModel), + ), + migrations.CreateModel( + name='Collectivite', + fields=[ + ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('name', models.CharField(blank=True, default=b'', max_length=150)), + ('openADS_id', models.PositiveIntegerField(help_text='ex: 3', verbose_name='openADS identifier')), + ('connecteur', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='collectivites', related_query_name='collectivite', to='atreal_openads.AtrealOpenads')), + ], + options={ + 'ordering': ['name'], + 'verbose_name': 'Collectivite', + }, + bases=(models.Model, atreal_openads.utils.BaseModel), ), migrations.CreateModel( name='ForwardFile', @@ -46,11 +64,90 @@ class Migration(migrations.Migration): ('file_hash', models.CharField(blank=True, default=b'', max_length=100)), ('orig_filename', models.CharField(blank=True, default=b'', max_length=100)), ('content_type', models.CharField(blank=True, default=b'', max_length=100)), - ('upload_file', models.FileField(null=True, upload_to=atreal_openads.models.get_upload_path)), + ('size', models.PositiveIntegerField(default=0)), + ('upload_file', models.FileField(blank=True, null=True, upload_to=atreal_openads.utils.get_upload_path)), ('upload_attempt', models.PositiveIntegerField(blank=True, default=0)), - ('upload_status', models.CharField(blank=True, default=b'', max_length=10)), + ('upload_status', models.CharField(choices=[(b'pending', 'Pending'), (b'uploading', 'Uploading'), (b'failed', 'Failed'), (b'success', 'Success')], default=b'pending', max_length=10)), ('upload_msg', models.CharField(blank=True, default=b'', max_length=255)), ('last_update_datetime', models.DateTimeField(auto_now=True)), + ('collectivite', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, related_name='forward_files', related_query_name='forward_file', to='atreal_openads.Collectivite')), + ('connecteur', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='forward_files', related_query_name='forward_file', to='atreal_openads.AtrealOpenads')), ], + options={ + 'ordering': ['-last_update_datetime'], + 'verbose_name': 'Forward File', + }, + bases=(models.Model, atreal_openads.utils.BaseModel), + ), + migrations.CreateModel( + name='Guichet', + fields=[ + ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('ouverture_jour_h', models.TimeField(help_text='ex: 08:30', verbose_name='Hour of opening (each day)')), + ('fermeture_jour_h', models.TimeField(help_text='ex: 17:00', verbose_name='Hour of closing (each day)')), + ('ouverture_sem_d', models.PositiveIntegerField(choices=[(1, 'Monday'), (2, 'Tuesday'), (3, 'Wednesday'), (4, 'Thursday'), (5, 'Friday'), (6, 'Saturday'), (7, 'Sunday')], default=1, help_text='ex: Lundi', verbose_name='Day of opening (each week)')), + ('fermeture_sem_d', models.PositiveIntegerField(choices=[(1, 'Monday'), (2, 'Tuesday'), (3, 'Wednesday'), (4, 'Thursday'), (5, 'Friday'), (6, 'Saturday'), (7, 'Sunday')], default=6, help_text='ex: Samedi', verbose_name='Day of closing (each week)')), + ('ouverture_sem_h', models.TimeField(help_text='ex: 08:30', verbose_name='Hour of opening (each week)')), + ('fermeture_sem_h', models.TimeField(help_text='ex: 12:15', verbose_name='Hour of closing (each week)')), + ('collectivite', models.OneToOneField(on_delete=django.db.models.deletion.CASCADE, related_name='guichet', to='atreal_openads.Collectivite')), + ], + options={ + 'ordering': ['collectivite'], + 'verbose_name': 'Guichet', + 'verbose_name_plural': 'Guichets Urbanisme', + }, + bases=(models.Model, atreal_openads.utils.BaseModel), + ), + migrations.AddIndex( + model_name='guichet', + index=models.Index(fields=[b'collectivite'], name=b'su_collectivite_idx'), + ), + migrations.AddIndex( + model_name='forwardfile', + index=models.Index(fields=[b'connecteur'], name=b'ff_connecteur_idx'), + ), + migrations.AddIndex( + model_name='forwardfile', + index=models.Index(fields=[b'collectivite'], name=b'ff_collectivite_idx'), + ), + migrations.AddIndex( + model_name='forwardfile', + index=models.Index(fields=[b'numero_demande', b'numero_dossier'], name=b'ff_deman_doss_idx'), + ), + migrations.AddIndex( + model_name='forwardfile', + index=models.Index(fields=[b'numero_demande'], name=b'ff_demande_idx'), + ), + migrations.AddIndex( + model_name='forwardfile', + index=models.Index(fields=[b'numero_dossier'], name=b'ff_dossier_idx'), + ), + migrations.AddIndex( + model_name='forwardfile', + index=models.Index(fields=[b'orig_filename'], name=b'ff_filename_idx'), + ), + migrations.AddIndex( + model_name='forwardfile', + index=models.Index(fields=[b'upload_status'], name=b'ff_status_idx'), + ), + migrations.AddIndex( + model_name='forwardfile', + index=models.Index(fields=[b'last_update_datetime'], name=b'ff_last_up_dt_idx'), + ), + migrations.AddIndex( + model_name='collectivite', + index=models.Index(fields=[b'connecteur', b'openADS_id'], name=b'col_conn_openADSid_idx'), + ), + migrations.AddIndex( + model_name='collectivite', + index=models.Index(fields=[b'connecteur'], name=b'col_connecteur_idx'), + ), + migrations.AddIndex( + model_name='collectivite', + index=models.Index(fields=[b'openADS_id'], name=b'col_openADS_id_idx'), + ), + migrations.AlterUniqueTogether( + name='collectivite', + unique_together=set([('connecteur', 'openADS_id')]), ), ] diff --git a/atreal_openads/models.py b/atreal_openads/models.py index bf08c55..c31f256 100644 --- a/atreal_openads/models.py +++ b/atreal_openads/models.py @@ -24,161 +24,374 @@ import datetime import os import re import magic -import hashlib -import copy - -from HTMLParser import HTMLParser from django.db import models from django.http import Http404 from django.utils.translation import ugettext_lazy as _ -from django.core.files import File from django.core.files.base import ContentFile +from django.core.exceptions import ValidationError from passerelle.base.models import BaseResource, HTTPResource from passerelle.utils.api import endpoint from passerelle.utils.jsonresponse import APIError from .json_schemas import ( - JSON_SCHEMA_CHECK_STATUS_OUT, JSON_SCHEMA_CREATE_DOSSIER_IN, - JSON_SCHEMA_CREATE_DOSSIER_OUT, - JSON_SCHEMA_GET_DOSSIER_OUT, - JSON_SCHEMA_GET_FWD_FILES_OUT, - JSON_SCHEMA_GET_FWD_FILES_STATUS_OUT, - JSON_SCHEMA_GET_COURRIER_OUT + #JSON_SCHEMA_CHECK_STATUS_OUT, + #JSON_SCHEMA_CREATE_DOSSIER_OUT, + #JSON_SCHEMA_GET_DOSSIER_OUT, + #JSON_SCHEMA_GET_FWD_FILES_OUT, + #JSON_SCHEMA_GET_FWD_FILES_STATUS_OUT, + #JSON_SCHEMA_GET_COURRIER_OUT +) + +from .utils import ( + force_encoded_string_output, + strip_tags, + clean_spaces, + normalize, + get_file_digest, + get_upload_path, + get_file_extension, + trunc_str_values, + DictDumper, + BaseModel ) -class MLStripper(HTMLParser): - """HTML parser that removes html tags.""" - def __init__(self): - self.reset() - self.fed = [] - def handle_data(self, d): - self.fed.append(d) - def get_data(self): - return ''.join(self.fed) - - -def strip_tags(html): - """Remove html tags from a string.""" - s = MLStripper() - s.feed(html) - return s.get_data() - - -def clean_spaces(text): - """Remove extra spaces an line breaks from a string.""" - text = text.replace('\n', ' ') - text = text.replace('\r', ' ') - text = text.replace('\t', ' ') - text = text.replace('\\n', ' ') - text = text.replace('\\r', ' ') - text = text.replace('\\t', ' ') - return re.sub(r' +', ' ', text).strip() - - -def normalize(value): - """Normalize a value to be send to openADS.API.""" - if value is None: - return '' - if not isinstance(value, unicode): - value = unicode(value) - return clean_spaces(value) - - -def get_file_data(path, b64=True): - """Return the content of a file as a string, in base64 if specified.""" - with open(path, 'r') as f: - if b64: - return base64.b64encode(f.read()) - return f.read() - - -def get_upload_path(instance, filename=None): - """Return a relative upload path for a file.""" - # be careful: - # * openADS accept only filename less than 50 chars - # * name should be unique, even if the content is the same - return 'pass_openADS_up_%s_%s' % ( - datetime.datetime.now().strftime('%Y-%b-%d_%Hh%Mm%Ss%f'), - instance.file_hash[:4] - ) - - -def trunc_str_values(value, limit, visited=None, truncate_text=u'…'): - """Truncate a string value (not dict keys) and append a truncate text.""" - - if visited is None: - visited = [] - if not value in visited: - if isinstance(value, basestring) and len(value) > limit: - value = value[:limit] + truncate_text - elif isinstance(value, dict) or isinstance(value, list) or isinstance(value, tuple): - visited.append(value) - iterator = value.iteritems() if isinstance(value, dict) else enumerate(value) - for k,v in iterator: - value[k] = trunc_str_values(v, limit, visited, truncate_text) - return value - - -class DictDumper(object): - """Helper to dump a dictionary to a string representation with lazy processing. - - Only applied when dict is converted to string (lazy processing): - - long strings truncated (after the dict has been 'deep' copied) - - (optionaly) dict converted with json.dumps instead of unicode(). - """ - - def __init__(self, dic, max_str_len=255, use_json_dumps=True): - """ arguments: - - dic string the dict to dump - - max_str_len integer the maximul length of string values - - use_json_dumps boolean True to use json.dumps() else it uses unicode() - """ - self.dic = dic - self.max_str_len = max_str_len - self.use_json_dumps = use_json_dumps - - def __str__(self): - dict_trunc = trunc_str_values(copy.deepcopy(self.dic), self.max_str_len) - dict_ref = json.dumps(dict_trunc) if self.use_json_dumps else dict_trunc - return unicode(dict_ref) - - -class ForwardFile(models.Model): +class ForwardFile(models.Model, BaseModel): """Represent a file uploaded by a user, to be forwarded to openADS.API.""" + + STATUSES = [ + ('pending' , _('Pending')), + ('uploading', _('Uploading')), + ('failed' , _('Failed')), + ('success' , _('Success')) + ] + + connecteur = models.ForeignKey('AtrealOpenads', + on_delete=models.CASCADE, + related_name="forward_files", + related_query_name="forward_file") + collectivite = models.ForeignKey('Collectivite', blank=True, null=True, + on_delete=models.CASCADE, + related_name="forward_files", + related_query_name="forward_file") numero_demande = models.CharField(max_length=20) numero_dossier = models.CharField(max_length=20) type_fichier = models.CharField(max_length=10) file_hash = models.CharField(max_length=100, default='', blank=True) orig_filename = models.CharField(max_length=100, default='', blank=True) content_type = models.CharField(max_length=100, default='', blank=True) - upload_file = models.FileField(upload_to=get_upload_path, null=True) + size = models.PositiveIntegerField(default=0) + upload_file = models.FileField(upload_to=get_upload_path, blank=True, null=True) upload_attempt = models.PositiveIntegerField(default=0, blank=True) - upload_status = models.CharField(max_length=10, default='', blank=True) + upload_status = models.CharField(max_length=10, choices=STATUSES, default='pending') upload_msg = models.CharField(max_length=255, default='', blank=True) last_update_datetime = models.DateTimeField(auto_now=True) + class Meta: + verbose_name = _('Forward File') + indexes = [ + models.Index(fields=['connecteur'] , name='ff_connecteur_idx'), + models.Index(fields=['collectivite'] , name='ff_collectivite_idx'), + models.Index(fields=['numero_demande', 'numero_dossier'], name='ff_deman_doss_idx'), + models.Index(fields=['numero_demande'], name='ff_demande_idx'), + models.Index(fields=['numero_dossier'], name='ff_dossier_idx'), + models.Index(fields=['orig_filename'] , name='ff_filename_idx'), + models.Index(fields=['upload_status'] , name='ff_status_idx'), + models.Index(fields=['last_update_datetime'], name='ff_last_up_dt_idx') + ] + ordering = ['-last_update_datetime'] -class AtrealOpenads(BaseResource, HTTPResource): + def get_status(self, status_codename=None): + """Return the upload status human name translated. + If specified codename is not found, return it. + """ + if not status_codename: + status_codename = self.upload_status + for st in self.STATUSES: + if st[0] == status_codename: + return st[1] + return status_codename + + @force_encoded_string_output + def __repr__(self): + return u'ForwardFile(id=%s,connecteur=%s,collectivite=%s,demande=%s,dossier=%s,type=%s,filename=%s,status=%s)' % ( + self.id, + unicode(self.connecteur) if hasattr(self, 'connecteur') else None, + unicode(self.collectivite) if hasattr(self, 'collectivite') else None, + self.numero_demande, self.numero_dossier, + self.type_fichier, self.orig_filename, self.upload_status) + + def __unicode__(self): + return u"%s[%s]" % (trunc_str_values(self.orig_filename, 20), self.get_status()) + + def get_url_params(self, *args, **kwargs): + params = super(ForwardFile, self).get_url_params(*args, **kwargs) + params['connecteur'] = self.connecteur.slug if self.connecteur else None + return params + + def update_content_type(self, only_if_empty=False): + """Update the content type from the content of the file.""" + if not self.content_type or not only_if_empty: + if self.upload_file and self.upload_file.size: + self.content_type = magic.from_buffer(self.upload_file.read(1024), mime=True) + else: + self.content_type = '' + + def update_file_hash(self, only_if_empty=False): + """Update the file_hash field from the content of the file.""" + if not self.file_hash or not only_if_empty: + if self.upload_file and self.upload_file.size: + self.file_hash = get_file_digest(self.upload_file) + else: + self.file_hash = '' + + # preprocessing data and validate model before saving + # /!\ Attention: this will not be triggered when doing bulk actions like with QuerySet.update() + # @see: https://docs.djangoproject.com/en/2.2/topics/db/models/#overriding-predefined-model-methods + # The note entitled "Overridden model methods are not called on bulk operations" + def save(self, *args, **kwargs): + # delete file content (on success) + if self.upload_status == 'success': + if self.upload_file and self.upload_file.size > 0: + self.upload_file.delete() + # else, update metadata + else: + self.size = self.upload_file.size if self.upload_file else 0 + self.update_file_hash() + self.update_content_type(only_if_empty=True) + # validation (calling self.clean()) + self.full_clean() + super(ForwardFile, self).save(*args, **kwargs) + + # check that one the following fields must not be blank/null: + # 'file_hash', 'orig_filename', 'upload_file' + # because if they are all empty we dont have any usefull information about the upload + def clean(self, *args, **kwargs): + ret = super(ForwardFile, self).clean(*args, **kwargs) + if (not self.file_hash + and not self.orig_filename + and (not self.upload_file or not self.upload_file.size) + ): + raise ValidationError( + _("A %s cannot have all the following fields empty: %s." % ( + self.get_verbose_name(), + ['file_hash', 'orig_filename', 'upload_file']) + ) + ) + return ret + + +class Collectivite(models.Model, BaseModel): + """Represent a "collectivite".""" + + name = models.CharField(max_length=150, default='', blank=True) + connecteur = models.ForeignKey('AtrealOpenads', + on_delete=models.CASCADE, + related_name="collectivites", + related_query_name="collectivite") + openADS_id = models.PositiveIntegerField(_('openADS identifier'), help_text=_('ex: 3')) + + # 'guichet' will be a property provided by the one-to-one relation of Guichet + # 'forward_files' will be a property provided by the related_name of the foreignKey + + class Meta: + verbose_name = _('Collectivite') + unique_together = ['connecteur', 'openADS_id'] + indexes = [ + models.Index(fields=['connecteur', 'openADS_id'], name='col_conn_openADSid_idx'), + models.Index(fields=['connecteur'], name='col_connecteur_idx'), + models.Index(fields=['openADS_id'], name='col_openADS_id_idx') + ] + ordering = ['name'] + + @classmethod + def get_fields(cls, *args, **kwargs): + # get_fields() return is immutable, hence the copy + fields = [f for f in super(Collectivite, cls).get_fields(*args, **kwargs)] + # moving related fields field at the end of the list + if fields: + rels = [] + for rel_name in ['forward_file', 'guichet']: + if (fields[0] + and hasattr(fields[0], 'name') + and fields[0].name == rel_name + ): + rels.append(fields.pop(0)) + for rel in reversed(rels): + fields.append(rel) + return fields + + @force_encoded_string_output + def __repr__(self): + return u'Collectivite(id=%s,name=%s,connecteur=%s,openADS_id=%s,guichet=%s)' % ( + self.id, unicode(self.name), + unicode(self.connecteur) if hasattr(self, 'connecteur') else None, + self.openADS_id, + unicode(self.guichet) if hasattr(self, 'guichet') else None) + + def __unicode__(self): + return self.name if isinstance(self.name, unicode) else unicode(self.name) + + def get_fields_kv(self, *args, **kwargs): + fields = super(Collectivite, self).get_fields_kv(*args, **kwargs) + # moving related fields field at the end of the list + if fields: + rels = [] + for rel_name in ['forward_file', 'guichet']: + if (fields[0] and fields[0][0] + and hasattr(fields[0][0], 'name') + and fields[0][0].name == rel_name + ): + rels.append(fields.pop(0)) + for rel in reversed(rels): + fields.append(rel) + return fields + + def get_url_params(self, *args, **kwargs): + params = super(Collectivite, self).get_url_params(*args, **kwargs) + params['connecteur'] = self.connecteur.slug if self.connecteur else None + return params + + +class Guichet(models.Model, BaseModel): + """Represent a "Guichet".""" + + DAYS = [ + (1, _('Monday')), + (2, _('Tuesday')), + (3, _('Wednesday')), + (4, _('Thursday')), + (5, _('Friday')), + (6, _('Saturday')), + (7, _('Sunday')) + ] + + collectivite = models.OneToOneField('Collectivite', + on_delete=models.CASCADE, + related_name="guichet") + ouverture_jour_h = models.TimeField(_('Hour of opening (each day)'), help_text=_('ex: 08:30')) + fermeture_jour_h = models.TimeField(_('Hour of closing (each day)'), help_text=_('ex: 17:00')) + ouverture_sem_d = models.PositiveIntegerField(_('Day of opening (each week)'), help_text=_('ex: Lundi'), choices=DAYS, default=1) + fermeture_sem_d = models.PositiveIntegerField(_('Day of closing (each week)'), help_text=_('ex: Samedi'), choices=DAYS, default=6) + ouverture_sem_h = models.TimeField(_('Hour of opening (each week)'), help_text=_('ex: 08:30')) + fermeture_sem_h = models.TimeField(_('Hour of closing (each week)'), help_text=_('ex: 12:15')) + + class Meta: + verbose_name = _('Guichet') + verbose_name_plural = _('Guichets Urbanisme') + indexes = [ + models.Index(fields=['collectivite'], name='su_collectivite_idx') + ] + ordering = ['collectivite'] + + @force_encoded_string_output + def __repr__(self): + return u'Guichet(id=%s,collectivite=%s,%s)' % ( + self.id, + unicode(self.collectivite) if hasattr(self, 'collectivite') else None, + unicode(self)) + + def __unicode__(self): + return u'%s %s -> %s %s [%s/%s]' % ( + unicode(self.DAYS[self.ouverture_sem_d - 1][1]), + self.ouverture_sem_h.strftime('%H:%M') if self.ouverture_sem_h else None, + unicode(self.DAYS[self.fermeture_sem_d - 1][1]), + self.fermeture_sem_h.strftime('%H:%M') if self.fermeture_sem_h else None, + self.ouverture_jour_h.strftime('%H:%M') if self.ouverture_jour_h else None, + self.fermeture_jour_h.strftime('%H:%M') if self.fermeture_jour_h else None) + + def get_url_params(self, *args, **kwargs): + params = super(Guichet, self).get_url_params(*args, **kwargs) + params['collectivite'] = self.collectivite.id if self.collectivite else None + params['connecteur'] = self.collectivite.connecteur.slug if self.collectivite else None + return params + + def get_list_url(self): + raise Exception(u"Guichet:get_list_url() method should not be called") + + # @raise TypeError if argument is not a datetime object + def is_open(self, dt): + """ Return 'True' if the "Guichet" is open, else False.""" + if dt: + if not isinstance(dt, datetime.datetime): + raise TypeError(u"is_open() expect a datetime object (not a %s)" % type(dt)) + + ouverture_jour_dt = datetime.datetime.combine(dt, self.ouverture_jour_h) + fermeture_jour_dt = datetime.datetime.combine(dt, self.fermeture_jour_h) + day = dt.isoweekday() + return ( + # opening day + (day == self.ouverture_sem_d + and dt.time() > self.ouverture_sem_h and dt < fermeture_jour_dt) + # closing day + or (day == self.fermeture_sem_d + and dt.time() < self.fermeture_sem_h and dt > ouverture_jour_dt) + # regular days + or ( day > self.ouverture_sem_d + and day < self.fermeture_sem_d + and dt > ouverture_jour_dt + and dt < fermeture_jour_dt + ) + ) + + return False + + +class AtrealOpenads(BaseResource, HTTPResource, BaseModel): """API that proxy/relay communications with/to openADS.""" - collectivite = models.CharField(_('Collectivity (identifier)'), max_length=255, - help_text=_('ex: Marseille, or ex: 3'), default='', blank=True) + default_collectivite_openADS_id = models.PositiveIntegerField(_("Default 'collectivite' (identifier in openADS)"), + help_text=_('ex: 3'), default=0, blank=True) openADS_API_url = models.URLField(_('openADS API URL'), max_length=255, help_text=_('ex: https://openads.your_domain.net/api/'), default='') - openADS_API_timeout = 3600 - category = _('Business Process Connectors') + # 'collectivites' will be a property provided by the related_name of the foreignKey + # 'forward_files' will be a property provided by the related_name of the foreignKey api_description = _('''This API provides exchanges with openADS.''') + category = _('Business Process Connectors') + class Meta: verbose_name = _('openADS') + verbose_name_plural = _('openADS') + ordering = ['openADS_API_url'] + @classmethod + def get_class_name_plural(cls, *args, **kwargs): + return cls.get_class_name(*args, **kwargs) + + @force_encoded_string_output + def __repr__(self): + return u'AtrealOpenads(id=%s,openADS=%s,login=%s,collectivites=%s,default=%s)' % ( + self.id, + unicode(self.openADS_API_url), + unicode(self.basic_auth_username), + self.collectivites.count(), + self.default_collectivite_openADS_id) + + def __unicode__(self): + return self.slug if isinstance(self.slug, unicode) else unicode(self.slug) + + def get_url_name(self, prefix=''): + return '%s%s' % (prefix + '-' if prefix else '', 'connector') + + def get_url_params(self, primary_key=True): + params = {'connector': 'atreal-openads'} + if primary_key: + params['slug'] = self.slug + return params + + def get_list_url(self): + raise Exception(u"AtrealOpenads:get_list_url() method should not be called") + + def get_collectivite(self, openADS_id): + """Return the 'collectivite' matching an openADS id.""" + return Collectivite.objects.get(connecteur=self,openADS_id=openADS_id) def log_json_payload(self, payload, title='payload', max_str_len=100): """Log a json paylod surrounded by dashes and with file content filtered.""" @@ -186,7 +399,6 @@ class AtrealOpenads(BaseResource, HTTPResource): self.logger.debug(u"%s", DictDumper(payload, max_str_len)) self.logger.debug(u"----- %s (end) -----", title) - def get_files_from_json_payload(self, payload, title='payload'): """Return files from a JSON payload with all checks and logging.""" @@ -215,7 +427,6 @@ class AtrealOpenads(BaseResource, HTTPResource): # return the files return files - def check_file_dict(self, dict_file, title='payload', b64=True): """Ensure a file dict has all its required items.""" @@ -261,7 +472,6 @@ class AtrealOpenads(BaseResource, HTTPResource): # return the first file return first - @endpoint( description=_("Test an openADS 'connexion'") #~ get={ @@ -273,7 +483,7 @@ class AtrealOpenads(BaseResource, HTTPResource): #~ } #~ } ) - def check_status(self, request=None): + def check_status(self, request=None, *args, **kwargs): """Check avaibility of the openADS.API service.""" url = urlparse.urljoin(self.openADS_API_url, '__api__') response = self.requests.get(url) @@ -282,6 +492,7 @@ class AtrealOpenads(BaseResource, HTTPResource): @endpoint( + perm='can_access', methods=['post'], pattern='^(?P\w+)/?$', example_pattern='{type_dossier}/', @@ -290,14 +501,15 @@ class AtrealOpenads(BaseResource, HTTPResource): 'collectivite': { 'description': _("Use this collectivite (instead of the default one)"), 'example_value': '3' - } + }, + 'now': {'description': _("Datetime (or string formatted to: '%Y-%m-%d %H:%M:%S') against which the 'guichet' is checked for opening"), 'example_value': 'DIA'}, }, post={'description': _("Create an openADS 'dossier'"), 'request_body': { 'schema': { 'application/json': JSON_SCHEMA_CREATE_DOSSIER_IN } - }, + } #~ 'response_body': { #~ 'schema': { #~ 'application/json': JSON_SCHEMA_CREATE_DOSSIER_OUT @@ -305,7 +517,7 @@ class AtrealOpenads(BaseResource, HTTPResource): #~ } } ) - def create_dossier(self, request, type_dossier, collectivite=None): + def create_dossier(self, request, type_dossier, collectivite=None, now=None, *args, **kwargs): # loads the request body as JSON content json_data = json.loads(request.body) @@ -313,8 +525,43 @@ class AtrealOpenads(BaseResource, HTTPResource): # log the request body (filtering the files content) self.log_json_payload(json_data, 'request') + # get the collectivite ID or use the connecteur's default one + collectivite_id = collectivite if collectivite else self.default_collectivite_openADS_id + + # get the collectivite instance + try: + collectivite = self.get_collectivite(collectivite_id) + + # no collectivite instance matching that ID + except Collectivite.DoesNotExist: + pass + + # a collectivite instance was found + else: + + # the collectivite has a guichet + if (hasattr(collectivite, 'guichet') and collectivite.guichet): + + # get the datetime against which the 'guichet' is checked for opening + now_fmt = '%Y-%m-%d %H:%M:%S' + if not now: + now = datetime.datetime.now() + elif isinstance(now, basestring): + now = datetime.datetime.strptime(now, now_fmt) + elif not isinstance(now, datetime.datetime): + raise APIError( + u"Invalid value of type '%s' for now argument of endpoint '%s' " + "(must be: %s)" % ( + type(now), + 'create_dossier', + "datetime or string formatted to '%s'" % now_fmt)) + + # if the guichet is not open + if not collectivite.guichet.is_open(now): + return {'message': _(u"Guichet closed for collectivite '%s'" % collectivite)} + # build the payload - payload = { "collectivite": int(collectivite) if collectivite else int(self.collectivite) } + payload = { "collectivite": int(collectivite_id) } payload["terrain"] = { "numero_voie": normalize(json_data['fields']['terrain_numero_voie']), @@ -362,6 +609,9 @@ class AtrealOpenads(BaseResource, HTTPResource): "nom_voie" : normalize(json_data['fields']['%snom_voie' % prefix]), "code_postal": normalize(json_data['fields']['%scode_postal' % prefix]), "localite" : normalize(json_data['fields']['%slocalite' % prefix]) + }, + "coordonnees": { + "email": normalize(json_data['fields']['%semail' % prefix]) } } @@ -404,11 +654,8 @@ class AtrealOpenads(BaseResource, HTTPResource): # set it as an upload upload_file = ContentFile(content) - # build a hash from the upload - file_hash = self.file_digest(upload_file) - - # build a filename (less than 50 chars) - filename = file_hash[45:] + '.pdf' + # get the file hash + file_hash = get_file_digest(upload_file) # get the content type if specified if 'content_type' in json_data['fields'][k]: @@ -419,9 +666,23 @@ class AtrealOpenads(BaseResource, HTTPResource): self.logger.warning("CERFA content type is '%s' instead of '%s'", content_type, 'application/pdf') # get the filename if specified + filename = None if 'filename' in json_data['fields'][k]: filename = json_data['fields'][k]['filename'] + # define the file extension + file_extension = get_file_extension(filename, content_type) + + # filename not specified + if not filename: + + # build a filename (less than 50 chars) + filename = file_hash[40:] + file_extension + + # update the specified filename with an extension, if none + elif '.' not in filename: + filename += file_extension + # set the type fichier based on the key (less than 10 chars) type_fichier = re.sub(r'_.*$', '', k)[:10] @@ -477,7 +738,7 @@ class AtrealOpenads(BaseResource, HTTPResource): # decode the recepisse from base 64 try: - recepisse_content = base64.b64decode(recepisse['b64_content']) + base64.b64decode(recepisse['b64_content']) except TypeError: raise APIError('Failed to decode recepisse content from base 64') self.logger.debug("Successfully decoded recepisse from base 64") @@ -502,6 +763,9 @@ class AtrealOpenads(BaseResource, HTTPResource): for f in files: rand_id = base64.urlsafe_b64encode(os.urandom(6)) FF = ForwardFile() + FF.connecteur = self + if isinstance(collectivite, Collectivite): + FF.collectivite = collectivite FF.numero_demande = rand_id FF.numero_dossier = numero_dossier for k in ['type_fichier', 'orig_filename', 'content_type', 'file_hash']: @@ -519,6 +783,7 @@ class AtrealOpenads(BaseResource, HTTPResource): job = self.add_job('upload_user_files', natural_id=numero_dossier, + request=None, type_dossier=type_dossier, numero_dossier=numero_dossier, file_ids=file_ids) @@ -536,11 +801,11 @@ class AtrealOpenads(BaseResource, HTTPResource): 'recepisse' : recepisse } - @endpoint( + perm='can_access', description=_("Get informations about an openADS 'dossier'"), - pattern='^(?P\w+)/?$', - example_pattern='{type_dossier}/', + pattern='^(?P\w+)/(?P\w+)/?$', + example_pattern='{type_dossier}/{numero_dossier}', parameters={ 'type_dossier' : {'description': _("Type of 'dossier'") , 'example_value': 'DIA'}, 'numero_dossier': {'description': _("Identifier for 'dossier'"), 'example_value': 'DIA0130551900001'} @@ -554,7 +819,7 @@ class AtrealOpenads(BaseResource, HTTPResource): #~ } #~ } ) - def get_dossier(self, request, type_dossier, numero_dossier): + def get_dossier(self, request, type_dossier, numero_dossier, *args, **kwargs): # make a request to openADS.API url = urlparse.urljoin(self.openADS_API_url, '/dossier/%s/%s' % (type_dossier, numero_dossier)) @@ -578,27 +843,11 @@ class AtrealOpenads(BaseResource, HTTPResource): # return the response as-is return response.json() - - def upload2ForwardFile(self, path, numero_dossier, type_fichier): - """Convert a file path to a ForwardFile.""" - if path: - rand_id = base64.urlsafe_b64encode(os.urandom(6)) - fwd_file = ForwardFile() - fwd_file.numero_demande = rand_id - fwd_file.numero_dossier = numero_dossier - fwd_file.type_fichier = type_fichier - fwd_file.orig_filename = os.path.basename(path) - fwd_file.content_type = magic.from_file(path, mime=True) - with open(path, 'r') as fp: - fwd_file.file_hash = self.file_digest(fp) - fwd_file.upload_file = File(open(path, 'r')) - fwd_file.upload_status = 'pending' - return fwd_file - return None - - @endpoint( + perm='can_access', description=_("Get informations about the forwarding of user files to openADS"), + pattern='^(?P\w+)/?$', + example_pattern='{numero_dossier}/', parameters={ 'numero_dossier': {'description': _("Identifier for 'dossier'"), 'example_value': 'DIA0130551900001'}, 'fichier_id' : {'description': _("File identifier") , 'example_value': '78'} @@ -612,7 +861,7 @@ class AtrealOpenads(BaseResource, HTTPResource): #~ } #~ } ) - def get_fwd_files(self, request, numero_dossier, fichier_id=None): + def get_fwd_files(self, request, numero_dossier, fichier_id=None, *args, **kwargs): payload = [] fwd_files = [] @@ -651,9 +900,11 @@ class AtrealOpenads(BaseResource, HTTPResource): # return the payload containing the list of files return payload - @endpoint( + perm='can_access', description=_("Get informations about the forwarding of a user file to openADS"), + pattern='^(?P\w+)/?$', + example_pattern='{numero_dossier}/', parameters={ 'numero_dossier': {'description': _("Identifier for 'dossier'"), 'example_value': 'DIA0130551900001'}, 'fichier_id' : {'description': _("File identifier") , 'example_value': '78'} @@ -667,7 +918,7 @@ class AtrealOpenads(BaseResource, HTTPResource): #~ } #~ } ) - def get_fwd_files_status(self, request, numero_dossier, fichier_id=None): + def get_fwd_files_status(self, request, numero_dossier, fichier_id=None, *args, **kwargs): # get all files matching 'numero_dossier' and 'fichier_id' fwd_files = self.get_fwd_files(request, numero_dossier, fichier_id) @@ -695,11 +946,11 @@ class AtrealOpenads(BaseResource, HTTPResource): # respond with the payload return payload - @endpoint( + perm='can_access', description= _("Get a 'courrier' from an openADS 'dossier'"), - pattern='^(?P\w+)/?$', - example_pattern='{type_dossier}/', + pattern='^(?P\w+)/(?P\w+)/(?P\w+)/?$', + example_pattern='{type_dossier}/{numero_dossier}/{lettre_type}', parameters={ 'type_dossier' : {'description': _("Type of 'dossier'") , 'example_value': 'DIA'}, 'numero_dossier': {'description': _("Identifier for 'dossier'"), 'example_value': 'DIA0130551900001'}, @@ -714,7 +965,7 @@ class AtrealOpenads(BaseResource, HTTPResource): #~ } #~ } ) - def get_courrier(self, request, type_dossier, numero_dossier, lettre_type): + def get_courrier(self, request, type_dossier, numero_dossier, lettre_type, *args, **kwargs): # make a request to openADS.API url = urlparse.urljoin( @@ -742,14 +993,13 @@ class AtrealOpenads(BaseResource, HTTPResource): # decode the courrier from base 64 try: - courrier_content = base64.b64decode(courrier['b64_content']) + base64.b64decode(courrier['b64_content']) except TypeError: raise APIError('Failed to decode courrier content from base 64') # return the 'courrier' file return {'courrier': courrier} - def get_response_error(self, response): """Return a error string from an HTTP response.""" try: @@ -780,44 +1030,94 @@ class AtrealOpenads(BaseResource, HTTPResource): detail = clean_spaces(strip_tags(response.content[:1000])) if response.content else '' return u"HTTP error: %s%s" % (response.status_code, ', ' + detail if detail else '') - + @endpoint( + perm='can_access', + description= _("Trigger the uploading of user's files to openADS"), + pattern='^(?P\w+)/(?P\w+)/?$', + example_pattern='{type_dossier}/{numero_dossier}', + parameters={ + 'type_dossier' : {'description': _("Type of 'dossier'") , 'example_value': 'DIA'}, + 'numero_dossier': {'description': _("Identifier for 'dossier'"), 'example_value': 'DIA0130551900001'}, + 'file_ids' : {'description': _("List of ForwardFile IDs to upload (coma separated)"), 'example_value': '12,18'} + }, +#~ get={ +#~ 'description': _("Trigger the uploading of user's files to openADS"), +#~ 'response_body': { +#~ 'schema': { +#~ 'application/json': JSON_SCHEMA_UPLOAD_USER_FILES +#~ } +#~ } +#~ } + ) # @raise ForwareFile.DoesNotExist if not found - def upload_user_files(self, type_dossier, numero_dossier, file_ids): + def upload_user_files(self, request, type_dossier, numero_dossier, file_ids=None, *args, **kwargs): """A Job to forward user uploaded files to openADS.""" payload = [] fwd_files = [] + if file_ids: + + # if file_ids is a string + if isinstance(file_ids, basestring): + file_ids = [int(fid) for fid in file_ids.split(',')] + + # invalid input + elif not isinstance(file_ids, list): + raise TypeError( + "Invalid 'file_ids' argument type '%s' " + "(must be string or list)" % type(file_ids)) + + # a list of ForwardFile IDs was specified + if file_ids: + fwd_files = ForwardFile.objects.filter(id__in=file_ids).all() + + # check that all ids where found + fwd_files_ids = set([ff.id for ff in fwd_files]) + file_ids_diff = [item for item in file_ids if item not in fwd_files_ids] + if file_ids_diff: + raise ForwardFile.DoesNotExist( + "The following ForwardFile IDs were not found: %s." % file_ids_diff) + + # filter out files not in status 'pending' + fwd_files_filtered = fwd_files.filter(upload_status='pending').all() + fwd_filtered_ids = set([ff.id for ff in fwd_files_filtered]) + file_ids_diff = [item for item in file_ids if item not in fwd_filtered_ids] + if file_ids_diff: + self.logger.warning( + "The following ForwardFile IDs were not in status '%s' " + "when asked specificaly to upload them: %s." % ('pending', file_ids_diff)) + fwd_files = fwd_files_filtered + + # no files_ids where specified + else: + + # process all ForwardFiles of the 'dossier' (in status 'pending') + fwd_files = ForwardFile.objects.filter( + numero_dossier=numero_dossier, + upload_status='pending' + ).all() + # for every file ids specified (in parameters of this job) - for fid in file_ids: - self.logger.debug(u"upload_user_files() ForwardFile file_id: %s", fid) + for fwd_file in fwd_files: + self.logger.debug(u"upload_user_files() ForwardFile file_id: %s", fwd_file.id) - # get the matching forward file - fwd_file = ForwardFile.objects.get(id=fid) + # add the file content and data to the payload + payload.append({ + 'filename' : fwd_file.orig_filename + ('.pdf' if fwd_file.orig_filename[-4:] != '.pdf' else ''), + 'content_type' : fwd_file.content_type, + 'b64_content' : base64.b64encode(fwd_file.upload_file.read()), + 'file_type' : fwd_file.type_fichier + }) + self.logger.debug("upload_user_files() payload added") - # found one - if fwd_file: - self.logger.debug("upload_user_files() got ForwardFile") - - # add the file content and data to the payload - payload.append({ - 'filename' : fwd_file.orig_filename + ('.pdf' if fwd_file.orig_filename[-4:] != '.pdf' else ''), - 'content_type' : fwd_file.content_type, - 'b64_content' : base64.b64encode(fwd_file.upload_file.read()), - 'file_type' : fwd_file.type_fichier - }) - self.logger.debug("upload_user_files() payload added") - - # update the file upload data (status and attempts) - fwd_file.upload_status = 'uploading' - fwd_file.upload_attempt += 1 - fwd_file.upload_msg = 'attempt %s' % fwd_file.upload_attempt - self.logger.debug(u"upload_user_files() upload_msg: '%s'", fwd_file.upload_msg) - fwd_file.save() - self.logger.debug("upload_user_files() ForwardFile saved") - - # append the forwarded file to the list - fwd_files.append(fwd_file) + # update the file upload data (status and attempts) + fwd_file.upload_status = 'uploading' + fwd_file.upload_attempt += 1 + fwd_file.upload_msg = 'attempt %s' % fwd_file.upload_attempt + self.logger.debug(u"upload_user_files() upload_msg: '%s'", fwd_file.upload_msg) + fwd_file.save() + self.logger.debug("upload_user_files() ForwardFile saved") # if files need to be forwarded if payload: @@ -837,6 +1137,8 @@ class AtrealOpenads(BaseResource, HTTPResource): # reponse is an error if response.status_code // 100 != 2: + error = self.get_response_error(response) + self.logger.warning(u"Request [POST] '%s' failed with error: '%s'", url, error) # update every files status as 'failed' and save the error message for fwd_file in fwd_files: @@ -852,12 +1154,16 @@ class AtrealOpenads(BaseResource, HTTPResource): file_ids ) + # respond with APIError + if request: + raise APIError(error) + # response is not an error else: # load the reponse as JSON try: - result = response.json() + response.json() # in case of failure except ValueError: @@ -875,6 +1181,10 @@ class AtrealOpenads(BaseResource, HTTPResource): fwd_files ) + # respond with APIError + if request: + raise APIError(u'No JSON content returned: %r' % response.content[:1000]) + # response correctly loaded as JSON else: @@ -885,20 +1195,21 @@ class AtrealOpenads(BaseResource, HTTPResource): fwd_file.upload_status = 'success' fwd_file.upload_msg = 'uploaded successfuly' - # delete file content (on success) + # save the file (content will be deleted automatically) fpath = fwd_file.upload_file.path - fwd_file.upload_file.delete() - - # save the file fwd_file.save() # log the success message self.logger.debug( - u"upload_user_files() flaging file '%s' has transfered (deleted '%s')", + u"upload_user_files() flaging file '%s' as 'transfered' (deleted '%s')", fwd_file.id, fpath ) + # respond with success + if request: + return {'message': 'all files transfered successfully'} + # no file need to be forwarded else: self.logger.warning( @@ -907,15 +1218,6 @@ class AtrealOpenads(BaseResource, HTTPResource): file_ids ) - - # copy-pasted from 'wcs/qommon/misc.py' - def file_digest(self, content, chunk_size=100000): - """Return a hash for the content specified.""" - digest = hashlib.sha256() - content.seek(0) - def read_chunk(): - return content.read(chunk_size) - for chunk in iter(read_chunk, ''): - digest.update(chunk) - return digest.hexdigest() - + # respond with message + if request: + return {'message': 'no file to transfer'} diff --git a/atreal_openads/static/css/atreal_openads.css b/atreal_openads/static/css/atreal_openads.css new file mode 100644 index 0000000..2029bb3 --- /dev/null +++ b/atreal_openads/static/css/atreal_openads.css @@ -0,0 +1,9 @@ +/* overrides for atreal_openads */ + +div#header h1.breadcrumbs a:last-child { + display: inline-block; +} + +.connecteur-collectivites .collectivite .service-urbanisme { + margin-left: 0.1em; +} diff --git a/atreal_openads/templates/atreal_openads/manage/collectivite_form.html b/atreal_openads/templates/atreal_openads/manage/collectivite_form.html new file mode 100644 index 0000000..5a581cc --- /dev/null +++ b/atreal_openads/templates/atreal_openads/manage/collectivite_form.html @@ -0,0 +1,33 @@ +{% extends "passerelle/manage.html" %} +{% load i18n passerelle gadjo staticfiles %} + +{% block extrascripts %} + {{ block.super }} + +{% endblock %} + +{% block breadcrumb %} + {{ block.super }} + {% if connecteur %}{{ connecteur.slug }}{% endif %} +{% endblock %} + +{% block appbar %} +

{{ view.model.get_verbose_name }} - {% if object.id %}{{ object.connecteur }} - {{ object.name }}{% else %}{% trans 'New' %}{% endif %}

+{% endblock %} + +{% block content %} + +
+
+ {% csrf_token %} + {{ form|with_template }} +
+ {% block buttons %} +
+ + +
+ {% endblock %} +
+ +{% endblock %} diff --git a/atreal_openads/templates/atreal_openads/manage/collectivite_list.html b/atreal_openads/templates/atreal_openads/manage/collectivite_list.html new file mode 100644 index 0000000..90c3e10 --- /dev/null +++ b/atreal_openads/templates/atreal_openads/manage/collectivite_list.html @@ -0,0 +1,99 @@ +{% extends "passerelle/manage.html" %} +{% load i18n passerelle gadjo staticfiles %} + +{% block extrascripts %} + {{ block.super }} + +{% endblock %} + +{% block breadcrumb %} + {{ block.super }} + {% if connecteur %}{{ connecteur.slug }}{% endif %} +{% endblock %} + +{% block appbar %} +

{{ view.model.get_verbose_name_plural }}

+ + {% if collectivite_add_url %} + {% trans "Add a Collectivite" %} + {% endif %} + +{% endblock %} + +{% block content %} + + + {% block list %} + {% if object_list %} + + + + {% for field in view.model.get_fields %} + {% if field.name != "connecteur" %} + + {% endif %} + {% endfor %} + + + + {% for col in object_list %} + + + {% for field, value in col.get_fields_kv %} + {% if field.name != "connecteur" %} + + {% endif %} + {% endfor %} + + {% endfor %} + + {% if is_paginated %} + + + + + {% endif %} + {% else %} + + + + {% endif %} + {% endblock %} +
{% trans "Actions" %} + {% if field.name == "guichet" %}{% trans "Guichet" %} + {% elif field.name == "forward_file" %}{% trans "Forward Files" %} + {% else %}{% trans field.verbose_name %} + {% endif %} +
+ {% trans "edit" %} + {% trans "delete" %} + + {% if field.name != "guichet" and field.name != "name" and field.name != "forward_file" %} + {{value}} + {% elif field.name == "name" %} + {{value}} + {% elif field.name == "guichet" %} + {% if value %}{{value}}{% else %}{% trans "None" %}{% endif %} + {% elif field.name == "forward_file" %} + {% if value %}{{ value.count }} {% trans "forward file(s)" %}{% else %}{% trans "None" %}{% endif %} + {% endif %} +
+ + {% if page_obj.has_previous %} + « {% trans "first" %} + {% trans "previous" %} + {% endif %} + + + {% trans "Page" %} {{ page_obj.number }} / {{ page_obj.paginator.num_pages }} + + + {% if page_obj.has_next %} + {% trans "next" %} + {% trans "last" %} » + {% endif %} + + +
{% trans "No data" %}
+ +{% endblock %} diff --git a/atreal_openads/templates/atreal_openads/manage/collectivite_view.html b/atreal_openads/templates/atreal_openads/manage/collectivite_view.html new file mode 100644 index 0000000..abbe50d --- /dev/null +++ b/atreal_openads/templates/atreal_openads/manage/collectivite_view.html @@ -0,0 +1,63 @@ +{% extends "passerelle/manage.html" %} +{% load i18n passerelle gadjo staticfiles %} + +{% block extrascripts %} + {{ block.super }} + +{% endblock %} + +{% block breadcrumb %} + {{ block.super }} + {% if connecteur %}{{ connecteur.slug }}{% endif %} +{% endblock %} + +{% block appbar %} +

{{ view.model.get_verbose_name }}{% if object.name %} - {{ object.name }}{% endif %}

+ + {% if object|can_delete:request.user %} + {% trans "delete" %} + {% endif %} + {% if object|can_edit:request.user %} + {% trans "edit" %} + {% endif %} + {% if not object.guichet and object|can_edit:request.user and guichet_add_url %} + {% trans "Add a Guichet" %} + {% endif %} + +{% endblock %} + +{% block content %} + +
+ {% block description %} + + {% for field, value in object.get_fields_kv %} +

+ {% if field.name == "guichet" %} + {% trans "Guichet" %} {% trans ":" %} + {% if value %} + {{value}} + {% else %} + {% trans "None" %} + {% endif %} + {% elif field.name != "connecteur" and field.name != "forward_file" and field.name != "id" and field.name != "name" %} + {% trans field.verbose_name %} {% trans ":" %} {{value}} + {% endif %} +

+ {% endfor %} + {% if object.forward_files.count %} +

+ {% trans "Forward files" %} + {% trans ":" %} + + {{ object.forward_files.count }} + {% if object.forward_files.count == 1 %}{% trans "forward file" %} + {% else %}{% trans "forward files" %} + {% endif %} + +

+ {% endif %} + {% endblock %} +
+ +{% endblock %} diff --git a/atreal_openads/templates/atreal_openads/manage/connector_view.html b/atreal_openads/templates/atreal_openads/manage/connector_view.html new file mode 100644 index 0000000..2bd54da --- /dev/null +++ b/atreal_openads/templates/atreal_openads/manage/connector_view.html @@ -0,0 +1,77 @@ +{% extends "passerelle/manage/service_view.html" %} +{% load i18n passerelle gadjo staticfiles %} + +{% block extrascripts %} + {{ block.super }} + +{% endblock %} + +{% block appbar %} +

{{ view.model.get_verbose_name }} - {{ object.title }} + {% with status=object.get_availability_status %} + {% if status %} + {% if status.down %}{% trans 'Down' %}{% endif %} + {% endif %} + {% endwith %} +

+ +{% if object|can_edit:request.user and has_check_status %} +{% trans 'availability check parameters' %} +{% endif %} +{% if object|can_edit:request.user %} +{% trans 'logging parameters' %} +{% endif %} +{% if object|can_delete:request.user %} +{% trans 'delete' %} +{% endif %} +{% if object|can_edit:request.user %} +{% trans 'edit' %} +{% endif %} +{% if object|can_edit:request.user and collectivite_add_url%} +{% trans "Add a collectivite" %} +{% endif %} + +{% endblock %} + +{% block description %} + {{ block.super }} + {% if object.id %} + {% if object.collectivites.count %} + {% if object.collectivites.count == 1 %} +

+ {% trans 'Collectivites' %} + {% trans ":" %} + {{ object.collectivites.first }} +

+ {% elif object.collectivites.count < 10 %} +

{% trans 'Collectivites' %}

+
    + {% for col in object.collectivites.all %} +
  • {{ col.name }} + {% if col.guichet %} ( {{ col.guichet }} ) {% endif %} +
  • + {% endfor %} +
+ {% else %} +

+ {% trans 'Collectivites' %} + {% trans ":" %} + {{ object.collectivites.count }} {% trans 'collectivites' %} +

+ {% endif %} + {% endif %} + {% if object.forward_files.count %} +

+ {% trans "Forward files" %} + {% trans ":" %} + + {{ object.forward_files.count }} + {% if object.forward_files.count == 1 %}{% trans "forward file" %} + {% else %}{% trans "forward files" %} + {% endif %} + +

+ {% endif %} + {% endif %} +{% endblock %} + diff --git a/atreal_openads/templates/atreal_openads/manage/forwardfile_form.html b/atreal_openads/templates/atreal_openads/manage/forwardfile_form.html new file mode 100644 index 0000000..cb72241 --- /dev/null +++ b/atreal_openads/templates/atreal_openads/manage/forwardfile_form.html @@ -0,0 +1,34 @@ +{% extends "passerelle/manage.html" %} +{% load i18n passerelle gadjo staticfiles %} + +{% block extrascripts %} + {{ block.super }} + +{% endblock %} + +{% block breadcrumb %} + {{ block.super }} + {% if connecteur %}{{ connecteur.slug }}{% endif %} + {% if collectivite %}{{ collectivite.name }}{% endif %} +{% endblock %} + +{% block appbar %} +

{{ view.model.get_verbose_name }} - {% if object.id %}{{ object.numero_dossier }} - {{ object.type_fichier }}{% endif %}

+{% endblock %} + +{% block content %} + +
+
+ {% csrf_token %} + {{ form|with_template }} +
+ {% block buttons %} +
+ + +
+ {% endblock %} +
+ +{% endblock %} diff --git a/atreal_openads/templates/atreal_openads/manage/forwardfile_list.html b/atreal_openads/templates/atreal_openads/manage/forwardfile_list.html new file mode 100644 index 0000000..2bb4bc9 --- /dev/null +++ b/atreal_openads/templates/atreal_openads/manage/forwardfile_list.html @@ -0,0 +1,92 @@ +{% extends "passerelle/manage.html" %} +{% load i18n passerelle gadjo staticfiles %} + +{% block extrascripts %} + {{ block.super }} + +{% endblock %} + +{% block breadcrumb %} + {{ block.super }} + {% if connecteur %}{{ connecteur.slug }}{% endif %} + {% if collectivite %}{{ collectivite.name }}{% endif %} +{% endblock %} + +{% block appbar %} +

{{ view.model.get_verbose_name }} - {% trans 'list' %}

+{% endblock %} + +{% block content %} + + + {% block list %} + {% if object_list %} + + + + {% for field in view.model.get_fields %} + {% if field.name != "connecteur" and field.name != "file_hash" and field.name != "orig_filename" %} + + {% endif %} + {% endfor %} + + + + {% for ff in object_list %} + + + {% for field, value in ff.get_fields_kv %} + {% if field.name != "connecteur" and field.name != "file_hash" and field.name != "orig_filename" %} + + {% endif %} + {% endfor %} + + {% endfor %} + + {% if is_paginated %} + + + + + {% endif %} + {% else %} + + + + {% endif %} + {% endblock %} +
{% trans "Actions" %}{% trans field.verbose_name %}
+ {% trans "edit" %} + {% trans "delete" %} + + {% if field.name != "collectivite" and field.name != "upload_file" %} + {{value}} + {% elif field.name == "collectivite" %} + {% if value %}{{value}} + {% else %}{% trans "None" %} + {% endif %} + {% elif field.name == "upload_file" %} + {% if value %}{{value}} + {% else %}{% trans "None" %} + {% endif %} + {% endif %} +
+ + {% if page_obj.has_previous %} + « {% trans "first" %} + {% trans "previous" %} + {% endif %} + + + {% trans "Page" %} {{ page_obj.number }} / {{ page_obj.paginator.num_pages }} + + + {% if page_obj.has_next %} + {% trans "next" %} + {% trans "last" %} » + {% endif %} + + +
{% trans "No data" %}
+ +{% endblock %} diff --git a/atreal_openads/templates/atreal_openads/manage/forwardfile_view.html b/atreal_openads/templates/atreal_openads/manage/forwardfile_view.html new file mode 100644 index 0000000..930e732 --- /dev/null +++ b/atreal_openads/templates/atreal_openads/manage/forwardfile_view.html @@ -0,0 +1,56 @@ +{% extends "passerelle/manage.html" %} +{% load i18n passerelle gadjo staticfiles %} + +{% block extrascripts %} + {{ block.super }} + +{% endblock %} + +{% block breadcrumb %} + {{ block.super }} + {% if connecteur %}{{ connecteur.slug }}{% endif %} + {% if collectivite %}{{ collectivite.name }}{% endif %} +{% endblock %} + +{% block appbar %} +

{{ view.model.get_verbose_name }} - {% if object.id %}{{ object.numero_dossier }} - {{ object.type_fichier }}{% endif %}

+ + {% if object|can_delete:request.user %} + {% trans "delete" %} + {% endif %} + {% if object|can_edit:request.user %} + {% trans "edit" %} + {% endif %} + +{% endblock %} + +{% block content %} + +
+ {% block description %} + + {% for field, value in object.get_fields_kv %} +

+ {% if field.name == "collectivite" %} + {% trans "Collectivite" %} {% trans ":" %} + {% if value %} + {{value}} + {% else %} + {% trans "None" %} + {% endif %} + {% elif field.name == "upload_file" %} + {% trans "File" %} {% trans ":" %} + {% if value %} + {{value}} + {% else %} + {% trans "None" %} + {% endif %} + {% elif field.name != "connecteur" and field.name != "id" %} + {% trans field.verbose_name %} {% trans ":" %} {{value}} + {% endif %} +

+ {% endfor %} + {% endblock %} +
+ +{% endblock %} diff --git a/atreal_openads/templates/atreal_openads/manage/guichet_form.html b/atreal_openads/templates/atreal_openads/manage/guichet_form.html new file mode 100644 index 0000000..5784f3c --- /dev/null +++ b/atreal_openads/templates/atreal_openads/manage/guichet_form.html @@ -0,0 +1,34 @@ +{% extends "passerelle/manage.html" %} +{% load i18n passerelle gadjo staticfiles %} + +{% block extrascripts %} + {{ block.super }} + +{% endblock %} + +{% block breadcrumb %} + {{ block.super }} + {% if connecteur %}{{ connecteur.slug }}{% endif %} + {% if collectivite %}{{ collectivite.name }}{% endif %} +{% endblock %} + +{% block appbar %} +

{{ view.model.get_verbose_name }} {% if object.id and object.collectivite %} - {{ object.collectivite }}{% else %}{% trans 'New' %}{% endif %}

+{% endblock %} + +{% block content %} + +
+
+ {% csrf_token %} + {{ form|with_template }} +
+ {% block buttons %} +
+ + +
+ {% endblock %} +
+ +{% endblock %} diff --git a/atreal_openads/templates/atreal_openads/manage/guichet_view.html b/atreal_openads/templates/atreal_openads/manage/guichet_view.html new file mode 100644 index 0000000..bcf42f6 --- /dev/null +++ b/atreal_openads/templates/atreal_openads/manage/guichet_view.html @@ -0,0 +1,44 @@ +{% extends "passerelle/manage.html" %} +{% load i18n passerelle gadjo staticfiles %} + +{% block extrascripts %} + {{ block.super }} + +{% endblock %} + +{% block breadcrumb %} + {{ block.super }} + {% if connecteur %}{{ connecteur.slug }}{% endif %} + {% if collectivite %}{{ collectivite.name }}{% endif %} +{% endblock %} + +{% block appbar %} +

{{ view.model.get_verbose_name }}

+ + {% if object|can_delete:request.user %} + {% trans "delete" %} + {% endif %} + {% if object|can_edit:request.user %} + {% trans "edit" %} + {% endif %} + +{% endblock %} + +{% block content %} + +
+ {% block description %} + + {% for field, value in object.get_fields_kv %} + {% if field.name != "id" %} +

+ {% if field.name != "collectivite" %} + {% trans field.verbose_name %} {% trans ":" %} {{value}} + {% endif %} +

+ {% endif %} + {% endfor %} + {% endblock %} +
+ +{% endblock %} diff --git a/atreal_openads/urls.py b/atreal_openads/urls.py new file mode 100644 index 0000000..76300a6 --- /dev/null +++ b/atreal_openads/urls.py @@ -0,0 +1,91 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import re + +from django.conf.urls import url + +from .views import ( + AtrealOpenadsView, + ForwardFileView, + ForwardFileListView, + ForwardFileUpdateView, + ForwardFileDeleteView, + CollectiviteView, + CollectiviteListView, + CollectiviteCreateView, + CollectiviteUpdateView, + CollectiviteDeleteView, + GuichetView, + GuichetCreateView, + GuichetUpdateView, + GuichetDeleteView +) + + +urlpatterns = [ + url(r'^(?P[\w,-]+)/$', AtrealOpenadsView.as_view(), name='view-connector') +] + +management_urlpatterns = [] + +for view in [ + ForwardFileView, + ForwardFileListView, + ForwardFileUpdateView, + ForwardFileDeleteView, + CollectiviteView, + CollectiviteListView, + CollectiviteCreateView, + CollectiviteUpdateView, + CollectiviteDeleteView, + GuichetView, + GuichetCreateView, + GuichetUpdateView, + GuichetDeleteView +]: + view_class_name = str(view.__name__) + m = re.search(r'^.*(Create|Update|Delete|List)View$', view_class_name) + if m: + view_action = m.group(1).lower() + else: + view_action = 'view' + + # no prefix for action 'view' + url_prefix = view_action.replace('update', 'edit') + '-' + + regex_base = r'^(?P[\w,-]+)/' + regex_pkey = '/(?P[\w,-]+)' + + url_name = url_prefix + view.model.get_class_name_dash_case() + regex_url = '%s%s' % (url_prefix if view_action != 'view' else '', + view.model.get_class_name_dash_case()) + + # no primary key for action 'create' and 'list' + if view_action in ['create', 'list']: + regex_pkey = '' + + # plural form of the url for action 'list' and no prefix + if view_action == 'list': + url_name = url_prefix + view.model.get_class_name_plural_dash_case() + regex_url = view.model.get_class_name_plural_dash_case() + + # for 'guichet' prefix the regex by the collectivite + if view.model.get_class_name() == 'Guichet': + regex_base += 'collectivite/(?P[\w,-]+)/' + + # build the regex + regex = regex_base + regex_url + regex_pkey + '$' + + # add the url pattern to the management list + management_urlpatterns += [url(regex, view.as_view(), name=url_name)] + +# add the ForwardFile 'list' url patterns for Collectivite +ff_list_regex_url = ForwardFileListView.model.get_class_name_plural_dash_case() +management_urlpatterns += [ + url( + r'^(?P[\w,-]+)/collectivite/(?P[\w,-]+)/' + ff_list_regex_url + '$', + ForwardFileListView.as_view(), + name='col-list-' + ff_list_regex_url + ) +] diff --git a/atreal_openads/utils.py b/atreal_openads/utils.py new file mode 100644 index 0000000..697943e --- /dev/null +++ b/atreal_openads/utils.py @@ -0,0 +1,258 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# passerelle - uniform access to multiple data sources and services +# Copyright (C) 2018 Entr'ouvert +# +# This program is free software: you can redistribute it and/or modify it +# under the terms of the GNU Affero General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +import json +import base64 +import datetime +import re +import hashlib +import copy +import mimetypes +import sys + +from HTMLParser import HTMLParser + +from django.urls import reverse_lazy + + +def to_dash_case(camel_str): + s1 = re.sub('(.)([A-Z][a-z]+)', r'\1-\2', camel_str) + return re.sub('([a-z0-9])([A-Z])', r'\1-\2', s1).lower() + + +# from: https://stackoverflow.com/a/13848698 +def force_encoded_string_output(func, default_enc='utf-8'): + """Decorator function that return the result converted to str type.""" + if sys.version_info.major < 3: + def _func(*args, **kwargs): + return func(*args, **kwargs).encode(sys.stdout.encoding or default_enc) + return _func + else: + return func + + +class MLStripper(HTMLParser): + """HTML parser that removes html tags.""" + def __init__(self): + self.reset() + self.fed = [] + def handle_data(self, d): + self.fed.append(d) + def get_data(self): + return ''.join(self.fed) + + +def strip_tags(html): + """Remove html tags from a string.""" + s = MLStripper() + s.feed(html) + return s.get_data() + + +def clean_spaces(text): + """Remove extra spaces an line breaks from a string.""" + text = text.replace('\n', ' ') + text = text.replace('\r', ' ') + text = text.replace('\t', ' ') + text = text.replace('\\n', ' ') + text = text.replace('\\r', ' ') + text = text.replace('\\t', ' ') + return re.sub(r' +', ' ', text).strip() + + +def normalize(value): + """Normalize a value to be send to openADS.API.""" + if value is None: + return '' + if not isinstance(value, unicode): + value = unicode(value) + return clean_spaces(value) + + +def get_file_data(path, b64=True): + """Return the content of a file as a string, in base64 if specified.""" + with open(path, 'r') as f: + if b64: + return base64.b64encode(f.read()) + return f.read() + + +# copy-pasted from 'wcs/qommon/misc.py' +def get_file_digest(content, chunk_size=100000): + """Return a hash for the content specified.""" + digest = hashlib.sha256() + content.seek(0) + def read_chunk(): + return content.read(chunk_size) + for chunk in iter(read_chunk, ''): + digest.update(chunk) + return digest.hexdigest() + + +def get_upload_path(instance, filename=None): + """Return a relative upload path for a file.""" + fn_ref = instance.orig_filename if instance.orig_filename else filename + # file_hash and content_type attribute are updated on file save() + # so if the file was not yet saved, it may have those attributes undefined + # this is why we update them here, if they are empty + instance.update_file_hash(only_if_empty=True) + instance.update_content_type(only_if_empty=True) + # be careful: + # * openADS accept only filename less than 50 chars + # * name should be unique, even if the content is the same + return 'to_openADS__%s__%s%s' % ( + datetime.datetime.now().strftime('%Y-%m-%d_%Hh%Mm%Ss%f'), + instance.file_hash[:4], + get_file_extension(fn_ref, instance.content_type)[:5] + ) + + +def get_file_extension(filename, mimetype=None): + """Return the extension of the file, according to its filename or specified mimetype.""" + file_extension = None + if filename and '.' in filename: + file_extension = re.sub(r'^.*\.', '.', filename) + elif mimetype: + file_extension = mimetypes.guess_extension(mimetype) + return file_extension if file_extension else '' + + +def trunc_str_values(value, limit, visited=None, truncate_text=u'…'): + """Truncate a string value (not dict keys) and append a truncate text.""" + + if visited is None: + visited = [] + if not value in visited: + if isinstance(value, basestring) and len(value) > limit: + value = value[:limit] + truncate_text + elif isinstance(value, dict) or isinstance(value, list) or isinstance(value, tuple): + visited.append(value) + iterator = value.iteritems() if isinstance(value, dict) else enumerate(value) + for k,v in iterator: + value[k] = trunc_str_values(v, limit, visited, truncate_text) + return value + + +class DictDumper(object): + """Helper to dump a dictionary to a string representation with lazy processing. + + Only applied when dict is converted to string (lazy processing): + - long strings truncated (after the dict has been 'deep' copied) + - (optionaly) dict converted with json.dumps instead of unicode(). + """ + + def __init__(self, dic, max_str_len=255, use_json_dumps=True): + """ arguments: + - dic string the dict to dump + - max_str_len integer the maximul length of string values + - use_json_dumps boolean True to use json.dumps() else it uses unicode() + """ + self.dic = dic + self.max_str_len = max_str_len + self.use_json_dumps = use_json_dumps + + @force_encoded_string_output + def __repr__(self): + return u'DictDumper(dic=%r,max_str_len=%r,use_json_dumps=%r)' % ( + self.dic, self.max_str_len, self.use_json_dumps) + + @force_encoded_string_output + def __str__(self): + return unicode(self) + + def __unicode__(self): + dict_trunc = trunc_str_values(copy.deepcopy(self.dic), self.max_str_len) + dict_ref = json.dumps(dict_trunc) if self.use_json_dumps else dict_trunc + return unicode(dict_ref) + + +class BaseModel(object): + """A class that provide basic usefull functions. + Intended for all models to extends it. + """ + + @classmethod + def get_verbose_name(cls): + """Return the verbose name of the class (helper for META option).""" + return cls._meta.verbose_name + + @classmethod + def get_verbose_name_plural(cls): + """Return the plural form of the verbose name of the class (helper for META option).""" + return cls._meta.verbose_name_plural + + @classmethod + def get_class_name(cls): + return cls.__name__ + + @classmethod + def get_class_name_plural(cls): + return cls.get_class_name() + 's' + + @classmethod + def get_class_name_dash_case(cls): + return to_dash_case(cls.get_class_name()) + + @classmethod + def get_class_name_plural_dash_case(cls): + return to_dash_case(cls.get_class_name_plural()) + + @classmethod + def get_class_name_title(cls): + return cls.get_class_name_dash_case().replace('-', ' ').title() + + @classmethod + def get_class_name_plural_title(cls): + return cls.get_class_name_plural_dash_case().replace('-', ' ').title() + + @classmethod + def get_fields(cls): + """Return the fields of the class (helper for META option).""" + return cls._meta.get_fields(include_parents=True, include_hidden=False) + + @force_encoded_string_output + def __str__(self): + return unicode(self) + + # mainly for the view + def get_fields_kv(self): + """Return the model's list of field's key value.""" + return [(field, getattr(self, field.name, None)) for field in self._meta.get_fields()] + + def get_url_name(self, prefix='', plural=False): + class_name_dash_case = self.__class__.get_class_name_dash_case() + if plural: + class_name_dash_case = self.__class__.get_class_name_plural_dash_case() + return '%s%s' % (prefix + '-' if prefix else '', class_name_dash_case) + + def get_url_params(self, primary_key=True): + return {'pk': self.id} if primary_key else {} + + def get_absolute_url(self): + return reverse_lazy(self.get_url_name('view'), kwargs=self.get_url_params()) + + def get_edit_url(self): + return reverse_lazy(self.get_url_name('edit'), kwargs=self.get_url_params()) + + def get_delete_url(self): + return reverse_lazy(self.get_url_name('delete'), kwargs=self.get_url_params()) + + def get_list_url(self): + return reverse_lazy(self.get_url_name('list', True), kwargs=self.get_url_params(False)) + diff --git a/atreal_openads/views.py b/atreal_openads/views.py new file mode 100644 index 0000000..790c044 --- /dev/null +++ b/atreal_openads/views.py @@ -0,0 +1,319 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +from django.urls import reverse_lazy + +from django.views.generic.detail import DetailView +from django.views.generic.list import ListView +from django.views.generic.edit import CreateView, UpdateView, DeleteView + +from passerelle.views import GenericConnectorView + +from .models import ForwardFile, Collectivite, Guichet, AtrealOpenads +from .forms import ForwardFileForm, CollectiviteForm, GuichetForm + + +def get_connecteur_from_request(view, key='connecteur'): + """Return the 'connecteur' object from the view object.""" + if not hasattr(view, 'connecteur') or not view.connecteur and view.request: + connecteur_slug = view.request.resolver_match.kwargs.get(key, None) + if connecteur_slug: + view.connecteur = AtrealOpenads.objects.get(slug=connecteur_slug) + return view.connecteur if hasattr(view, 'connecteur') else None + + +def get_collectivite_from_request(view, key='collectivite'): + """Return the 'collectivite' object from the view object.""" + if not hasattr(view, 'collectivite') or not view.collectivite and view.request: + collectivite_id = view.request.resolver_match.kwargs.get(key, None) + if collectivite_id: + view.collectivite = Collectivite.objects.get(id=collectivite_id) + return view.collectivite if hasattr(view, 'collectivite') else None + + +class ForwardFileView(DetailView): + model = ForwardFile + template_name = 'atreal_openads/manage/forwardfile_view.html' + + def get_context_data(self, *args, **kwargs): + context = super(ForwardFileView, self).get_context_data(*args, **kwargs) + context['connecteur'] = get_connecteur_from_request(self) + return context + + +class ForwardFileListView(ListView): + model = ForwardFile + template_name = 'atreal_openads/manage/forwardfile_list.html' + paginate_by = 50 + ordering = 'id' + + def get_queryset(self): + qset = None + + collectivite = get_collectivite_from_request(self) + if collectivite: + qset = super(ForwardFileListView, self).get_queryset().filter( + connecteur=get_connecteur_from_request(self), + collectivite=collectivite) + else: + qset = super(ForwardFileListView, self).get_queryset().filter( + connecteur=get_connecteur_from_request(self)) + + order_by = None + order_by_param = self.request.GET.get('order-by', None) + if order_by_param: + fields_names = [f.name for f in self.model.get_fields()] + order_by_field = order_by_param[1:] if order_by_param[0] == '-' else order_by_param + if order_by_field in fields_names: + order_by = order_by_param + + return qset.order_by(order_by) if order_by else qset # qset.order_by() + + def get_context_data(self, *args, **kwargs): + context = super(ForwardFileListView, self).get_context_data(*args, **kwargs) + context['connecteur'] = get_connecteur_from_request(self) + context['collectivite'] = get_collectivite_from_request(self) + return context + + +class ForwardFileUpdateView(UpdateView): + model = ForwardFile + form_class = ForwardFileForm + template_name = 'atreal_openads/manage/forwardfile_form.html' + + def get_context_data(self, *args, **kwargs): + context = super(ForwardFileUpdateView, self).get_context_data(*args, **kwargs) + context['connecteur'] = get_connecteur_from_request(self) + context['collectivite'] = get_collectivite_from_request(self) + return context + + def get_success_url(self, *args, **kwargs): + back_to = self.request.GET.get('back-to') + if back_to == 'list-forward-files': + return reverse_lazy('list-forward-files', kwargs={ + 'connecteur': get_connecteur_from_request(self).slug + }) + elif back_to == 'col-list-forward-files': + obj = self.get_object() + if obj.collectivite: + return reverse_lazy('col-list-forward-files', kwargs={ + 'connecteur': get_connecteur_from_request(self).slug, + 'collectivite': obj.collectivite.id + }) + return self.get_object().get_absolute_url() + + +class ForwardFileDeleteView(DeleteView): + model = ForwardFile + form_class = ForwardFileForm + template_name = 'atreal_openads/manage/forwardfile_form.html' + + def get_context_data(self, *args, **kwargs): + context = super(ForwardFileDeleteView, self).get_context_data(*args, **kwargs) + context['connecteur'] = get_connecteur_from_request(self) + context['collectivite'] = get_collectivite_from_request(self) + return context + + def get_success_url(self, *args, **kwargs): + back_to = self.request.GET.get('back-to') + if back_to == 'list-forward-files': + return reverse_lazy('list-forward-files', kwargs={ + 'connecteur': get_connecteur_from_request(self).slug + }) + elif back_to == 'col-list-forward-files': + obj = self.get_object() + if obj.collectivite: + return reverse_lazy('col-list-forward-files', kwargs={ + 'connecteur': get_connecteur_from_request(self).slug, + 'collectivite': obj.collectivite.id + }) + return reverse_lazy('view-connector', kwargs={ + 'connector': 'atreal-openads', + 'slug' : get_connecteur_from_request(self).slug + }) + + +class CollectiviteView(DetailView): + model = Collectivite + template_name = 'atreal_openads/manage/collectivite_view.html' + + def get_context_data(self, *args, **kwargs): + context = super(CollectiviteView, self).get_context_data(*args, **kwargs) + context['connecteur'] = get_connecteur_from_request(self) + context['guichet_add_url'] = reverse_lazy('create-guichet', kwargs={ + 'connecteur' : context['connecteur'].slug, + 'collectivite': self.get_object().id}) + context['forward_files_list_url'] = reverse_lazy('col-list-forward-files', kwargs={ + 'connecteur' : context['connecteur'].slug, + 'collectivite': self.get_object().id}) + return context + + +class CollectiviteListView(ListView): + model = Collectivite + template_name = 'atreal_openads/manage/collectivite_list.html' + paginate_by = 50 + ordering = 'id' + + def get_queryset(self): + qset = super(CollectiviteListView, self).get_queryset().filter( + connecteur=get_connecteur_from_request(self)) + + order_by = None + order_by_param = self.request.GET.get('order-by', None) + if order_by_param: + fields_names = [f.name for f in self.model.get_fields()] + order_by_field = order_by_param[1:] if order_by_param[0] == '-' else order_by_param + if order_by_field in fields_names: + order_by = order_by_param + + return qset.order_by(order_by) if order_by else qset # qset.order_by() + + def get_context_data(self, *args, **kwargs): + context = super(CollectiviteListView, self).get_context_data(*args, **kwargs) + context['connecteur'] = get_connecteur_from_request(self) + context['collectivite_add_url'] = reverse_lazy('create-collectivite', kwargs={ + 'connecteur': context['connecteur'].slug}) + return context + + +class CollectiviteCreateView(CreateView): + model = Collectivite + form_class = CollectiviteForm + template_name = 'atreal_openads/manage/collectivite_form.html' + + def get_context_data(self, *args, **kwargs): + context = super(CollectiviteCreateView, self).get_context_data(*args, **kwargs) + context['connecteur'] = get_connecteur_from_request(self) + return context + + def get_form_kwargs(self): + kwargs = super(CollectiviteCreateView, self).get_form_kwargs() + kwargs['connecteur'] = get_connecteur_from_request(self) + return kwargs + + def get_success_url(self, *args, **kwargs): + if self.request.GET.get('back-to') == 'list-collectivites': + return reverse_lazy('list-collectivites', kwargs={ + 'connecteur' : get_connecteur_from_request(self).slug + }) + return reverse_lazy('view-connector', kwargs={ + 'connector': 'atreal-openads', + 'slug' : get_connecteur_from_request(self).slug + }) + + +class CollectiviteUpdateView(UpdateView): + model = Collectivite + form_class = CollectiviteForm + template_name = 'atreal_openads/manage/collectivite_form.html' + + def get_context_data(self, *args, **kwargs): + context = super(CollectiviteUpdateView, self).get_context_data(*args, **kwargs) + context['connecteur'] = get_connecteur_from_request(self) + return context + + def get_success_url(self, *args, **kwargs): + if self.request.GET.get('back-to') == 'list-collectivites': + return reverse_lazy('list-collectivites', kwargs={ + 'connecteur' : get_connecteur_from_request(self).slug + }) + return self.get_object().get_absolute_url() + + +class CollectiviteDeleteView(DeleteView): + model = Collectivite + form_class = CollectiviteForm + template_name = 'atreal_openads/manage/collectivite_form.html' + + def get_context_data(self, *args, **kwargs): + context = super(CollectiviteDeleteView, self).get_context_data(*args, **kwargs) + context['connecteur'] = get_connecteur_from_request(self) + return context + + def get_success_url(self, *args, **kwargs): + if self.request.GET.get('back-to') == 'list-collectivites': + return reverse_lazy('list-collectivites', kwargs={ + 'connecteur' : get_connecteur_from_request(self).slug + }) + return reverse_lazy('view-connector', kwargs={ + 'connector': 'atreal-openads', + 'slug' : get_connecteur_from_request(self).slug + }) + + +class GuichetView(DetailView): + model = Guichet + template_name = 'atreal_openads/manage/guichet_view.html' + + def get_context_data(self, *args, **kwargs): + context = super(GuichetView, self).get_context_data(*args, **kwargs) + context['connecteur'] = get_connecteur_from_request(self) + context['collectivite'] = get_collectivite_from_request(self) + return context + + +class GuichetCreateView(CreateView): + model = Guichet + form_class = GuichetForm + template_name = 'atreal_openads/manage/guichet_form.html' + + def get_form_kwargs(self): + kwargs = super(GuichetCreateView, self).get_form_kwargs() + kwargs['collectivite'] = get_collectivite_from_request(self) + return kwargs + + def get_context_data(self, *args, **kwargs): + context = super(GuichetCreateView, self).get_context_data(*args, **kwargs) + context['connecteur'] = get_connecteur_from_request(self) + context['collectivite'] = get_collectivite_from_request(self) + return context + + def get_success_url(self): + return reverse_lazy('view-collectivite', kwargs={ + 'connecteur': get_connecteur_from_request(self).slug, + 'pk' : get_collectivite_from_request(self).id + }) + + +class GuichetUpdateView(UpdateView): + model = Guichet + form_class = GuichetForm + template_name = 'atreal_openads/manage/guichet_form.html' + + def get_context_data(self, *args, **kwargs): + context = super(GuichetUpdateView, self).get_context_data(*args, **kwargs) + context['connecteur'] = get_connecteur_from_request(self) + context['collectivite'] = get_collectivite_from_request(self) + return context + + +class GuichetDeleteView(DeleteView): + model = Guichet + form_class = GuichetForm + template_name = 'atreal_openads/manage/guichet_form.html' + + def get_context_data(self, *args, **kwargs): + context = super(GuichetDeleteView, self).get_context_data(*args, **kwargs) + context['connecteur'] = get_connecteur_from_request(self) + context['collectivite'] = get_collectivite_from_request(self) + return context + + def get_success_url(self, *args, **kwargs): + return reverse_lazy('view-collectivite', kwargs={ + 'connecteur': get_connecteur_from_request(self).slug, + 'pk' : get_collectivite_from_request(self).id + }) + + +class AtrealOpenadsView(GenericConnectorView): + model = AtrealOpenads + template_name = 'atreal_openads/manage/connector_view.html' + + def get_context_data(self, *args, **kwargs): + context = super(AtrealOpenadsView, self).get_context_data(*args, **kwargs) + context['collectivite_fields'] = Collectivite.get_fields() + context['collectivite_add_url'] = reverse_lazy('create-collectivite', kwargs={ + 'connecteur': self.get_object().slug}) + return context + diff --git a/tests/test_atreal_openads.py b/tests/test_atreal_openads.py index 80e2252..ad3544c 100644 --- a/tests/test_atreal_openads.py +++ b/tests/test_atreal_openads.py @@ -12,32 +12,39 @@ import json import os import base64 import re +import datetime +import magic + from requests import Response from django.http import Http404 from django.http.request import HttpRequest, QueryDict from django.http.response import JsonResponse from django.core.files import File +from django.core.files.base import ContentFile +#from django.db.models.query import QuerySet +from django.core.exceptions import ValidationError from passerelle.utils.jsonresponse import APIError from passerelle.base.models import Job -from atreal_openads.models import ( - strip_tags, - clean_spaces, - normalize, +from atreal_openads.utils import ( get_file_data, - get_upload_path, - trunc_str_values, - DictDumper, - AtrealOpenads, - ForwardFile + get_file_digest, + trunc_str_values +) + +from atreal_openads.models import ( + ForwardFile, + Guichet, + Collectivite, + AtrealOpenads ) CONNECTOR_NAME = 'atreal-openads' CONNECTOR_SLUG = 'atreal' -COLLECTIVITE = 3 +COLLECTIVITE = 79 OPENADS_API_LOGIN = 'publik-passerelle' OPENADS_API_PASSWORD = base64.urlsafe_b64encode(os.urandom(20)) OPENADS_API_URL = 'http://openads.api/' @@ -55,114 +62,221 @@ TEST_FILE_PLAN_CADASTRAL = os.path.join(RESOURCES_DIR, 'plancadastral.pdf') def atreal_openads(db): return AtrealOpenads.objects.create( slug = CONNECTOR_SLUG, - collectivite = COLLECTIVITE, + default_collectivite_openADS_id = COLLECTIVITE, openADS_API_url = OPENADS_API_URL, basic_auth_username = OPENADS_API_LOGIN, basic_auth_password = OPENADS_API_PASSWORD ) +@pytest.fixture +def collectivite_1(db, atreal_openads): + return Collectivite.objects.create( + name = u'Macollectivité', + connecteur = atreal_openads, + openADS_id = '3' + ) -def test_strip_tags(): - s = 'aaa b cc ' - assert strip_tags(s) == s - - ss = s + 'dd' - assert strip_tags(ss) == s + 'dd' - - ss = s + 'dd' - assert strip_tags(ss) == s + 'dd' - - ss = s + 'dd' - assert strip_tags(ss) == s + 'dd' - - ss = s + ' 1 < 3' - assert strip_tags(ss) == s + ' 1 < 3' +@pytest.fixture +def collectivite_1_guichet(db, atreal_openads, collectivite_1): + return Guichet.objects.create( + collectivite = collectivite_1, + ouverture_jour_h = datetime.time(9, 0), + fermeture_jour_h = datetime.time(17, 0), + ouverture_sem_d = 1, # Lundi + fermeture_sem_d = 5, # Vendredi + ouverture_sem_h = datetime.time(8, 30), + fermeture_sem_h = datetime.time(12, 15) + ) -def test_clean_spaces(): - s = 'aaa b cc ' - assert clean_spaces(s) == 'aaa b cc' - - s = 'a\ta b\nb c\rc d\\n\\r\\td' - assert clean_spaces(s) == 'a a b b c c d d' +def upload2ForwardFile(connecteur, path, numero_dossier, type_fichier): + """Convert a file path to a ForwardFile.""" + if path: + rand_id = base64.urlsafe_b64encode(os.urandom(6)) + fwd_file = ForwardFile() + fwd_file.connecteur = connecteur + fwd_file.numero_demande = rand_id + fwd_file.numero_dossier = numero_dossier + fwd_file.type_fichier = type_fichier + fwd_file.orig_filename = os.path.basename(path) + fwd_file.content_type = magic.from_file(path, mime=True) + with open(path, 'r') as fp: + fwd_file.file_hash = get_file_digest(fp) + fwd_file.upload_file = File(open(path, 'r')) + fwd_file.upload_status = 'pending' + return fwd_file + return None -def test_normalize(): - assert normalize(None) == '' +def test_forward_file(atreal_openads): + ff = ForwardFile( + numero_demande='45641531', + numero_dossier=FAKE_NUMERO_DOSSIER, + type_fichier='CERFA', + orig_filename='afile', + file_hash='ffdf456fdsvgb4bgfb6g4f5b', + upload_status='pending', + connecteur=atreal_openads, + collectivite=None + ) + ff.upload_file.save(ff.orig_filename, ContentFile(get_file_data(TEST_FILE_CERFA_DIA))) + ff.save() - s = 'aaa b cc ' - assert normalize(s) == 'aaa b cc' + assert repr(ff) == ( + u'ForwardFile(id=%s,connecteur=%s,collectivite=%s' + ',demande=%s,dossier=%s,type=%s,filename=%s,status=%s)' % ( + ff.id, unicode(ff.connecteur), None, + ff.numero_demande, ff.numero_dossier, + ff.type_fichier, ff.orig_filename, ff.upload_status + ) + ).encode('utf-8') - s = 'a\ta b\nb c\rc d\\n\\r\\td' - assert normalize(s) == 'a a b b c c d d' + assert str(ff) == '%s[%s]' % (trunc_str_values(ff.orig_filename, 20), 'Pending') + assert unicode(ff) == u'%s[%s]' % (trunc_str_values(ff.orig_filename, 20), 'Pending') + + assert ff.get_status() == 'Pending' + assert ff.get_status('invalid') == 'invalid' + + params = ff.get_url_params() + assert params['connecteur'] == atreal_openads.slug + + assert ff.upload_file is not None + assert ff.upload_file.size > 0 + assert ff.size == ff.upload_file.size + assert ff.file_hash == '811588016518eedeb4507f3e4c41be958a03576b0cd20bdb2cb9c6a186dbd887' + + ff.content_type = 'application/pdf' + ff.upload_status = 'success' + ff.save() + assert ff.upload_status == 'success' + assert ff.get_status() == 'Success' + assert ff.content_type == 'application/pdf' + + with pytest.raises(ValueError) as e: + ff.upload_file.size + assert unicode(e.value) == "The 'upload_file' attribute has no file associated with it." + assert ff.size > 0 + assert ff.file_hash == '811588016518eedeb4507f3e4c41be958a03576b0cd20bdb2cb9c6a186dbd887' + + ff.file_hash = '' + ff.update_file_hash() + ff.update_content_type() + ff.save() + assert ff.file_hash == '' + assert ff.content_type == '' + + ff.orig_filename = '' + with pytest.raises(ValidationError) as e: + ff.save() + assert len(e.value.messages) == 1 + assert '__all__' in e.value.message_dict + assert unicode(e.value.message_dict['__all__'][0]) == u"A %s cannot have all the following fields empty: %s." % ( + ff.get_verbose_name(), + ['file_hash', 'orig_filename', 'upload_file']) + + ff.delete() -def test_get_file_data(): - assert get_file_data(TEST_FILE_CERFA_DIA) == base64.b64encode(open(TEST_FILE_CERFA_DIA).read()) - assert get_file_data(TEST_FILE_CERFA_DIA, b64=False) == open(TEST_FILE_CERFA_DIA).read() +def test_collectivite(collectivite_1, collectivite_1_guichet): + col = collectivite_1 + + assert repr(col) == ( + u'Collectivite(id=%s,name=%s,connecteur=%s,openADS_id=%s,guichet=%s)' % ( + 1, unicode(col.name), unicode(col.connecteur), col.openADS_id, + unicode(col.guichet) if hasattr(col, 'guichet') else None + ) + ).encode('utf-8') + + assert str(col) == col.name.encode('utf-8') + + assert unicode(col) == col.name + + class_fields = Collectivite.get_fields() + assert len(class_fields) == 6 + assert class_fields[0].name == 'id' + assert class_fields[1].name == 'name' + assert class_fields[2].name == 'connecteur' + assert class_fields[3].name == 'openADS_id' + assert class_fields[4].name == 'guichet' + assert class_fields[5].name == 'forward_file' + + instance_fields = col.get_fields_kv() + assert len(instance_fields) == 6 + assert instance_fields[0][0].name == 'id' + assert instance_fields[1][0].name == 'name' + assert instance_fields[2][0].name == 'connecteur' + assert instance_fields[3][0].name == 'openADS_id' + assert instance_fields[4][0].name == 'guichet' + assert instance_fields[5][0].name == 'forward_file' + assert instance_fields[0][1] == col.id + assert instance_fields[1][1] == col.name + assert instance_fields[2][1] is col.connecteur + assert instance_fields[3][1] == col.openADS_id + assert instance_fields[4][1] is col.guichet + assert instance_fields[5][1] is None # shouldn't it be QuerySet? + + params = col.get_url_params() + assert params['connecteur'] == col.connecteur.slug -def test_get_upload_path(app, atreal_openads): - FF = atreal_openads.upload2ForwardFile(TEST_FILE_CERFA_DIA, FAKE_NUMERO_DOSSIER, 'cerfa') - assert re.search( - r"^pass_openADS_up_%s_%s$" % - ('[0-9]{4}-[A-Z][a-z]{2}-[0-9]{2}_[0-9]{2}h[0-9]{2}m[0-9]{2}s[0-9]+', 'cc90'), - get_upload_path(FF)) +def test_guichet(collectivite_1_guichet): + g = collectivite_1_guichet + + assert repr(g) == ( + u'Guichet(id=%s,collectivite=%s,%s)' % ( + 1, unicode(g.collectivite), unicode(g) + ) + ).encode('utf-8') + + assert str(g) == u'Monday 08:30 -> Friday 12:15 [09:00/17:00]'.encode('utf-8') + + assert unicode(g) == u'Monday 08:30 -> Friday 12:15 [09:00/17:00]' + + params = g.get_url_params() + assert params['collectivite'] == g.collectivite.id + + with pytest.raises(Exception) as e: + g.get_list_url() + assert unicode(e.value) == u"Guichet:get_list_url() method should not be called" -def test_trunc_str_values(): - d = {} - assert trunc_str_values(d, 10) == d - d = {'a': '123456789'} - assert trunc_str_values(d, 0) == {'a': u'…'} - d = {'a': '123456789'} - assert trunc_str_values(d, 1) == {'a': u'1…'} - d = {'a': '123456789'} - assert trunc_str_values(d, 2) == {'a': u'12…'} - d = {'a': '123456789'} - assert trunc_str_values(d, 5) == {'a': u'12345…'} - d = {'a': '123456789'} - assert trunc_str_values(d, 8) == {'a': u'12345678…'} - d = {'a': '123456789'} - assert trunc_str_values(d, 9) == {'a': u'123456789'} - d = {'a': '123456789'} - assert trunc_str_values(d, 10) == d +def test_guichet_is_open(collectivite_1_guichet): + g = collectivite_1_guichet - d = {'a': '123456789', 'b123456789': '987654321'} - assert trunc_str_values(d, 5) == {'a': u'12345…', 'b123456789': u'98765…'} + dt_fmt = '%Y-%m-%d %H:%M' - d = {'a': '123456789', 'b123456789': '987654321', 'c': {'c1':'ABCDEFGHIJK'}} - assert trunc_str_values(d, 5) == {'a': u'12345…', 'b123456789': u'98765…', 'c': {'c1': u'ABCDE…'}} + d_monday = '2019-07-29' + d_sunday = '2019-07-28' + d_saturday = '2019-07-27' + d_friday = '2019-07-26' + d_thursday = '2019-07-25' + d_wednesday = '2019-07-24' + d_tuesday = '2019-07-22' + t_open = '10:44' + t_closed_before = '6:33' + t_closed_after = '20:08' - d = {'a': '123456789', 'b123456789': '987654321', 'c': {'c1':'ABCDEFGHIJK'}, 'd': ['123456789']} - assert trunc_str_values(d, 5) == {'a': u'12345…', 'b123456789': u'98765…', 'c': {'c1': u'ABCDE…'}, 'd': [u'12345…']} + for d in [d_monday, d_tuesday, d_wednesday, d_thursday, d_friday]: + for t in [(t_open, True), (t_closed_before, False), (t_closed_after, False)]: + dt = datetime.datetime.strptime(d + ' ' + t[0], dt_fmt) + assert g.is_open(dt) == t[1] - d = {'a': '123456789', 'b123456789': '987654321', 'c': {'c1':'ABCDEFGHIJK'}, 'd': ['123456789', {'eeeeeeeeee':'132456789'}]} - assert trunc_str_values(d, 5) == {'a': u'12345…', 'b123456789': u'98765…', 'c': {'c1': u'ABCDE…'}, 'd': [u'12345…', {'eeeeeeeeee': u'13245…'}]} + dt = datetime.datetime.strptime(d_friday + ' 16:12', dt_fmt) + assert g.is_open(dt) == False + + for d in [d_saturday, d_sunday]: + for t in [t_open, t_closed_before, t_closed_after]: + dt = datetime.datetime.strptime(d + ' ' + t, dt_fmt) + assert g.is_open(dt) == False + + with pytest.raises(TypeError) as e: + g.is_open('invalid datetime') + assert unicode(e.value) == u"is_open() expect a datetime object (not a %s)" % type('') + + assert g.is_open(None) == False -def test_dict_dumper(): - d = {} - dd = DictDumper(d, use_json_dumps=False) - assert d == dd.dic - assert unicode(d) == unicode(dd) - dd = DictDumper(d, 0, use_json_dumps=False) - assert d == dd.dic - assert unicode(d) == unicode(dd) - - d = {'a': '123456789'} - dd = DictDumper(d, 10, use_json_dumps=False) - assert d == dd.dic - assert unicode(d) == unicode(dd) - dd = DictDumper(d, 5, use_json_dumps=False) - assert d == dd.dic - assert unicode(dd) == unicode({'a': u'12345…'}) - dd = DictDumper(d, 5, use_json_dumps=True) - assert d == dd.dic - assert unicode(dd) == u'{"a": "12345\\u2026"}' - - -def test_openads_log_json_payload(app, atreal_openads): +def test_openads_log_json_payload(atreal_openads): # TODO implement assert True # change the debug file path @@ -170,7 +284,7 @@ def test_openads_log_json_payload(app, atreal_openads): # check that what was is logged is correct -def test_openads_get_files_from_json_payload(app, atreal_openads): +def test_openads_get_files_from_json_payload(atreal_openads): title = 'payload' assert atreal_openads.get_files_from_json_payload({'files':[{'a':'file'}]}) == [{'a':'file'}] @@ -194,7 +308,7 @@ def test_openads_get_files_from_json_payload(app, atreal_openads): assert unicode(e.value) == u"Expecting non-empty '%s' value in JSON %s" % ('files', title) -def test_check_file_dict(app, atreal_openads): +def test_check_file_dict(atreal_openads): title = 'payload' d = { @@ -235,7 +349,7 @@ def test_check_file_dict(app, atreal_openads): assert unicode(e.value) == u"Expecting 'file.%s' key in JSON %s" % ('b64_content', title) -def test_get_first_file_from_json_payload(app, atreal_openads): +def test_get_first_file_from_json_payload(atreal_openads): title = 'payload' d = { @@ -250,7 +364,7 @@ def test_get_first_file_from_json_payload(app, atreal_openads): d, title, ensure_content=True, b64=False) == d['files'][0] -def test_openads_check_status(app, atreal_openads): +def test_openads_check_status(atreal_openads): fake_resp_json = { 'message': 'Service online' } @@ -261,7 +375,7 @@ def test_openads_check_status(app, atreal_openads): assert jresp['response'] == 200 -def test_openads_create_dossier(app, atreal_openads): +def test_openads_create_dossier(atreal_openads, collectivite_1, collectivite_1_guichet): fake_req_json = { "fields": { @@ -272,6 +386,7 @@ def test_openads_create_dossier(app, atreal_openads): # mandataire "mandataire_prenom" : "John", "mandataire_nom" : "Man", + "mandataire_email" : "mandataire_email@domain.example", "mandataire_qualite" : "Une personne morale", "mandataire_qualite_raw" : "Une personne morale", @@ -288,6 +403,7 @@ def test_openads_create_dossier(app, atreal_openads): # petitionnaire "prenom": "Toto", "nom" : "Loulou", + "email" : "petitionnaire_email@domain.example", "qualite" : "Un particulier", "qualite_raw": "Un particulier", @@ -325,12 +441,12 @@ def test_openads_create_dossier(app, atreal_openads): "plan_cadastral_2": { "content" : get_file_data(TEST_FILE_PLAN_CADASTRAL), "content_type": "application/pdf", - "filename" : os.path.basename(TEST_FILE_PLAN_CADASTRAL) + #"filename" : 'plan_cad' }, "pouvoir_mandat": { "content" : get_file_data(TEST_FILE_CERFA_DIA), "content_type": "application/pdf", - "filename" : 'mandat.pdf' + "filename" : 'mandat' } } } @@ -354,15 +470,50 @@ def test_openads_create_dossier(app, atreal_openads): with pytest.raises(ValueError) as e: with mock.patch('passerelle.utils.Request.post') as requests_post: requests_post.return_value = fake_resp_bad - atreal_openads.create_dossier(req, 'DIA',collectivite='not an integer') + atreal_openads.create_dossier(req, 'DIA', collectivite='not an integer') assert unicode(e.value) == "invalid literal for int() with base 10: 'not an integer'" with pytest.raises(APIError) as e: with mock.patch('passerelle.utils.Request.post') as requests_post: requests_post.return_value = fake_resp_bad - atreal_openads.create_dossier(req, 'DIA') + atreal_openads.create_dossier(req, 'DIA', collectivite_1.openADS_id) assert unicode(e.value) == "HTTP error: 502" + # TODO update the code and return message when it will be + # correctly implemented in the openADS.API side. + fake_resp_404 = Response() + fake_resp_404.status_code = 404 + fake_resp_404.reason = 'Page not found' + + with pytest.raises(APIError) as e: + with mock.patch('passerelle.utils.Request.post') as requests_post: + requests_post.return_value = fake_resp_404 + atreal_openads.create_dossier(req, 'DIA', collectivite=999) + assert unicode(e.value) == "HTTP error: 404" + + # guichet is open from Monday/8:30 to Friday/12:15, between 9:00 and 17:00 + now = datetime.datetime(2019, 8, 10, 16, 0, 0) + jresp = atreal_openads.create_dossier(req, 'DIA', collectivite_1.openADS_id, now=now) + assert jresp is not None + assert len(jresp) == 1 + assert 'message' in jresp + assert jresp['message'] == u"Guichet closed for collectivite '%s'" % collectivite_1 + + now = '2019-08-10 16:00:00' + jresp = atreal_openads.create_dossier(req, 'DIA', collectivite_1.openADS_id, now=now) + assert jresp is not None + assert len(jresp) == 1 + assert 'message' in jresp + assert jresp['message'] == u"Guichet closed for collectivite '%s'" % collectivite_1 + + now = {'invalid': 'type'} + with pytest.raises(APIError) as e: + jresp = atreal_openads.create_dossier(req, 'DIA', collectivite_1.openADS_id, now=now) + assert unicode(e.value) == u"Invalid value of type '%s' for now argument of endpoint '%s' (must be: %s)" % ( + type(now), + 'create_dossier', + "datetime or string formatted to '%s'" % '%Y-%m-%d %H:%M:%S') + fake_resp_json = { 'numero_dossier' : FAKE_NUMERO_DOSSIER, 'files': [{ @@ -380,7 +531,7 @@ def test_openads_create_dossier(app, atreal_openads): with mock.patch('passerelle.utils.Request.post') as requests_post: requests_post.return_value = fake_resp - jresp = atreal_openads.create_dossier(req, 'DIA') + jresp = atreal_openads.create_dossier(req, 'DIA', collectivite_1.openADS_id) assert jresp['numero_dossier'] == fake_resp_json['numero_dossier'] assert jresp['recepisse']['b64_content'] == fake_resp_json['files'][0]['b64_content'] assert jresp['recepisse']['content_type'] == 'application/pdf' @@ -391,7 +542,7 @@ def test_openads_create_dossier(app, atreal_openads): with pytest.raises(APIError) as e: with mock.patch('passerelle.utils.Request.post') as requests_post: requests_post.return_value = fake_resp - atreal_openads.create_dossier(req, 'DIA') + atreal_openads.create_dossier(req, 'DIA', collectivite_1.openADS_id) assert unicode(e.value) == u"Expecting '%s' value in JSON response to be a %s (not a %s)" % ( 'numero_dossier', 'string', type({})) @@ -400,7 +551,7 @@ def test_openads_create_dossier(app, atreal_openads): with pytest.raises(APIError) as e: with mock.patch('passerelle.utils.Request.post') as requests_post: requests_post.return_value = fake_resp - atreal_openads.create_dossier(req, 'DIA') + atreal_openads.create_dossier(req, 'DIA', collectivite_1.openADS_id) assert unicode(e.value) == u"Expecting 'numero_dossier' key in JSON response" fake_resp_json['files'][0]['b64_content'] = 'invalid_;{[content}' @@ -408,14 +559,14 @@ def test_openads_create_dossier(app, atreal_openads): with pytest.raises(APIError) as e: with mock.patch('passerelle.utils.Request.post') as requests_post: requests_post.return_value = fake_resp - atreal_openads.create_dossier(req, 'DIA') + atreal_openads.create_dossier(req, 'DIA', collectivite_1.openADS_id) assert unicode(e.value) == u'Failed to decode recepisse content from base 64' fake_resp._content = 'df[{gfd;g#vfd' with pytest.raises(APIError) as e: with mock.patch('passerelle.utils.Request.post') as requests_post: requests_post.return_value = fake_resp - atreal_openads.create_dossier(req, 'DIA') + atreal_openads.create_dossier(req, 'DIA', collectivite_1.openADS_id) assert unicode(e.value) == u'No JSON content returned: %r' % fake_resp._content job = Job.objects.filter(natural_id=FAKE_NUMERO_DOSSIER).last() @@ -425,7 +576,7 @@ def test_openads_create_dossier(app, atreal_openads): assert job.method_name == 'upload_user_files' assert job.natural_id == FAKE_NUMERO_DOSSIER assert job.parameters is not None - assert len(job.parameters) == 3 + assert len(job.parameters) == 4 assert 'file_ids' in job.parameters assert len(job.parameters['file_ids']) == 4 file_ids = job.parameters['file_ids'] @@ -456,7 +607,7 @@ def test_openads_create_dossier(app, atreal_openads): assert FF.upload_status == 'success' -def test_openads_get_dossier(app, atreal_openads): +def test_openads_get_dossier(atreal_openads): fake_resp_bad = Response() fake_resp_bad.status_code = 502 fake_resp_bad.reason = 'Bad gateway' @@ -513,34 +664,7 @@ def test_openads_get_dossier(app, atreal_openads): assert unicode(e.value) == u"HTTP error: 404, [path] (Invalid Type) \"invalid_type\" is not one of DIA, PC, DP, AT, PD" -def test_openads_upload2ForwardFile(app, atreal_openads): - FF = atreal_openads.upload2ForwardFile(None, None, None) - assert FF is None - - FF = atreal_openads.upload2ForwardFile(TEST_FILE_CERFA_DIA, FAKE_NUMERO_DOSSIER, 'cerfa') - assert isinstance(FF, ForwardFile) - assert len(FF.numero_demande) > 0 - assert FF.numero_dossier == FAKE_NUMERO_DOSSIER - assert FF.type_fichier == 'cerfa' - assert FF.orig_filename == os.path.basename(TEST_FILE_CERFA_DIA) - assert FF.content_type == 'application/pdf' - assert len(FF.file_hash) > 0 - assert isinstance(FF.upload_file, File) - assert FF.upload_status == 'pending' - - FF = atreal_openads.upload2ForwardFile(TEST_FILE_PLAN_CADASTRAL, FAKE_NUMERO_DOSSIER, 'plan') - assert isinstance(FF, ForwardFile) - assert len(FF.numero_demande) > 0 - assert FF.numero_dossier == FAKE_NUMERO_DOSSIER - assert FF.type_fichier == 'plan' - assert FF.orig_filename == os.path.basename(TEST_FILE_PLAN_CADASTRAL) - assert FF.content_type == 'application/pdf' - assert len(FF.file_hash) > 0 - assert isinstance(FF.upload_file, File) - assert FF.upload_status == 'pending' - - -def test_openads_get_fwd_files(app, atreal_openads): +def test_openads_get_fwd_files(atreal_openads): with pytest.raises(APIError) as e: atreal_openads.get_fwd_files(None, FAKE_NUMERO_DOSSIER, fichier_id='not an integer') assert unicode(e.value) == u"fichier_id must be an integer" @@ -553,7 +677,7 @@ def test_openads_get_fwd_files(app, atreal_openads): assert resp_empty is not None assert len(resp_empty) == 0 - FF = atreal_openads.upload2ForwardFile(TEST_FILE_CERFA_DIA, FAKE_NUMERO_DOSSIER, 'cerfa') + FF = upload2ForwardFile(atreal_openads, TEST_FILE_CERFA_DIA, FAKE_NUMERO_DOSSIER, 'cerfa') FF.save() assert isinstance(FF, ForwardFile) @@ -576,12 +700,12 @@ def test_openads_get_fwd_files(app, atreal_openads): assert jresp[0]['last_update_datetime'] == FF.last_update_datetime -def test_openads_get_fwd_files_status(app, atreal_openads): +def test_openads_get_fwd_files_status(atreal_openads): with pytest.raises(Http404) as e: atreal_openads.get_fwd_files_status(None, FAKE_NUMERO_DOSSIER, fichier_id=18) assert re.search(r"^No file matches 'numero_dossier=[^']+' and 'id=[^']+'.$", str(e.value)) - FF = atreal_openads.upload2ForwardFile(TEST_FILE_CERFA_DIA, FAKE_NUMERO_DOSSIER, 'cerfa') + FF = upload2ForwardFile(atreal_openads, TEST_FILE_CERFA_DIA, FAKE_NUMERO_DOSSIER, 'cerfa') FF.save() assert isinstance(FF, ForwardFile) @@ -606,7 +730,7 @@ def test_openads_get_fwd_files_status(app, atreal_openads): assert len(jresp['failed']) == 0 -def test_openads_get_courrier(app, atreal_openads): +def test_openads_get_courrier(atreal_openads): lettre_type = 'dia_renonciation_preempter' fake_resp_bad = Response() @@ -655,7 +779,7 @@ def test_openads_get_courrier(app, atreal_openads): assert unicode(e.value) == u'No JSON content returned: %r' % fake_resp._content -def test_get_response_error(app, atreal_openads): +def test_get_response_error(atreal_openads): fake_resp_json = { 'errors': [ { @@ -685,16 +809,34 @@ def test_get_response_error(app, atreal_openads): assert error_msg == u"HTTP error: %s, %s" % (fake_resp.status_code, fake_resp._content) -def test_openads_upload_user_files(app, atreal_openads): +def test_openads_upload_user_files(atreal_openads): - # TODO check logs (because this doesn't do anything but log) - atreal_openads.upload_user_files('DIA', FAKE_NUMERO_DOSSIER, file_ids=[]) + req = HttpRequest() + req._body = '' + req.path = '/upload_user_files' + req.method = 'GET' + req.encoding = 'utf-8' + req.GET = QueryDict(mutable=True) # required because of encoding setter + req.POST = QueryDict(mutable=True) # required because of encoding setter + req.content_type = 'application/json' + req.content_params = None + req.COOKIES = {} + req.META = {} + req._read_started = False with pytest.raises(ForwardFile.DoesNotExist) as e: - atreal_openads.upload_user_files('DIA', FAKE_NUMERO_DOSSIER, file_ids=[999]) - assert unicode(e.value) == u"ForwardFile matching query does not exist." + atreal_openads.upload_user_files(req, 'DIA', FAKE_NUMERO_DOSSIER, file_ids=[999]) + assert unicode(e.value) == u"The following ForwardFile IDs were not found: %s." % [999] - FF = atreal_openads.upload2ForwardFile(TEST_FILE_CERFA_DIA, FAKE_NUMERO_DOSSIER, 'cerfa') + with pytest.raises(ValueError) as e: + atreal_openads.upload_user_files(req, 'DIA', FAKE_NUMERO_DOSSIER, file_ids='invalid string') + assert unicode(e.value) == u"invalid literal for int() with base 10: '%s'" % 'invalid string' + + with pytest.raises(TypeError) as e: + atreal_openads.upload_user_files(req, 'DIA', FAKE_NUMERO_DOSSIER, file_ids={'invalid':'type'}) + assert unicode(e.value) == u"Invalid 'file_ids' argument type '%s' (must be string or list)" % type({'invalid':'type'}) + + FF = upload2ForwardFile(atreal_openads, TEST_FILE_CERFA_DIA, FAKE_NUMERO_DOSSIER, 'cerfa') FF.save() assert isinstance(FF, ForwardFile) assert FF.upload_status == 'pending' @@ -705,9 +847,11 @@ def test_openads_upload_user_files(app, atreal_openads): fake_resp_bad.status_code = 502 fake_resp_bad.reason = 'Bad gateway' - with mock.patch('passerelle.utils.Request.post') as requests_post: - requests_post.return_value = fake_resp_bad - atreal_openads.upload_user_files('DIA', FAKE_NUMERO_DOSSIER, file_ids=[file_id]) + with pytest.raises(APIError) as e: + with mock.patch('passerelle.utils.Request.post') as requests_post: + requests_post.return_value = fake_resp_bad + atreal_openads.upload_user_files(req, 'DIA', FAKE_NUMERO_DOSSIER, file_ids=str(file_id)) + assert unicode(e.value) == u'HTTP error: 502' FFup = ForwardFile.objects.get(id=file_id) assert isinstance(FFup, ForwardFile) @@ -717,6 +861,9 @@ def test_openads_upload_user_files(app, atreal_openads): assert FFup.upload_status == 'failed' assert FFup.upload_msg == "HTTP error: 502" + FFup.upload_status = 'pending' + FFup.save() + fake_resp = Response() fake_resp.status_code = 200 fake_resp.headers = {'Content-Type': 'application/json'} @@ -724,9 +871,11 @@ def test_openads_upload_user_files(app, atreal_openads): fake_resp.reason = 'OK' fake_resp._content = 'invalid_;{[content}' - with mock.patch('passerelle.utils.Request.post') as requests_post: - requests_post.return_value = fake_resp - atreal_openads.upload_user_files('DIA', FAKE_NUMERO_DOSSIER, file_ids=[file_id]) + with pytest.raises(APIError) as e: + with mock.patch('passerelle.utils.Request.post') as requests_post: + requests_post.return_value = fake_resp + atreal_openads.upload_user_files(req, 'DIA', FAKE_NUMERO_DOSSIER, file_ids=[file_id]) + assert unicode(e.value) == u'No JSON content returned: %r' % fake_resp.content FFup = ForwardFile.objects.get(id=file_id) assert isinstance(FFup, ForwardFile) @@ -736,11 +885,19 @@ def test_openads_upload_user_files(app, atreal_openads): assert FFup.upload_status == 'failed' assert FFup.upload_msg == u'No JSON content returned: %r' % fake_resp._content + jresp = atreal_openads.upload_user_files(req, 'DIA', FAKE_NUMERO_DOSSIER) + assert jresp == {'message': 'no file to transfer'} + + FFup = ForwardFile.objects.get(id=file_id) + FFup.upload_status = 'pending' + FFup.save() + fake_resp_json = "You want add some files on %s " % FAKE_NUMERO_DOSSIER fake_resp._content = json.dumps(fake_resp_json) with mock.patch('passerelle.utils.Request.post') as requests_post: requests_post.return_value = fake_resp - atreal_openads.upload_user_files('DIA', FAKE_NUMERO_DOSSIER, file_ids=[file_id]) + jresp = atreal_openads.upload_user_files(req, 'DIA', FAKE_NUMERO_DOSSIER) + assert jresp == {'message': 'all files transfered successfully'} FFup = ForwardFile.objects.get(id=file_id) assert isinstance(FFup, ForwardFile) diff --git a/tests/test_forms.py b/tests/test_forms.py new file mode 100644 index 0000000..f1e8520 --- /dev/null +++ b/tests/test_forms.py @@ -0,0 +1,130 @@ +# -*- coding: utf-8 -*- + +import pytest +import os +import base64 +import datetime + +from django.core.files import File + +from atreal_openads.forms import ( + ForwardFileForm, + CollectiviteForm, + GuichetForm +) + +from atreal_openads.models import ( + ForwardFile, + Guichet, + Collectivite, + AtrealOpenads +) + + +CONNECTOR_NAME = 'atreal-openads' +CONNECTOR_SLUG = 'atreal' +COLLECTIVITE = 79 +OPENADS_API_LOGIN = 'publik-passerelle' +OPENADS_API_PASSWORD = base64.urlsafe_b64encode(os.urandom(20)) +OPENADS_API_URL = 'http://openads.api/' + +FAKE_COOKIE_CRSF = base64.urlsafe_b64encode(os.urandom(20)) +FAKE_NUMERO_DOSSIER = base64.urlsafe_b64encode(os.urandom(10)) + +TESTS_DIR = os.path.dirname(__file__) +RESOURCES_DIR = os.path.join(TESTS_DIR, 'resources') +TEST_FILE_CERFA_DIA = os.path.join(RESOURCES_DIR, 'cerfa_10072-02.pdf') +TEST_FILE_PLAN_CADASTRAL = os.path.join(RESOURCES_DIR, 'plancadastral.pdf') + + +@pytest.fixture +def atreal_openads(db): + return AtrealOpenads.objects.create( + slug = CONNECTOR_SLUG, + default_collectivite_openADS_id = COLLECTIVITE, + openADS_API_url = OPENADS_API_URL, + basic_auth_username = OPENADS_API_LOGIN, + basic_auth_password = OPENADS_API_PASSWORD + ) + +@pytest.fixture +def collectivite_1(db, atreal_openads): + return Collectivite.objects.create( + name = u'Macollectivité', + connecteur = atreal_openads, + openADS_id = '3' + ) + +@pytest.fixture +def collectivite_1_guichet(db, atreal_openads, collectivite_1): + return Guichet.objects.create( + collectivite = collectivite_1, + ouverture_jour_h = datetime.time(9, 0), + fermeture_jour_h = datetime.time(17, 0), + ouverture_sem_d = 1, # Lundi + fermeture_sem_d = 5, # Vendredi + ouverture_sem_h = datetime.time(8, 30), + fermeture_sem_h = datetime.time(12, 15) + ) + + +def test_forwardfile_form(atreal_openads, collectivite_1): + form = ForwardFileForm() + assert form.instance is not None + + ff = ForwardFile( + connecteur = None, + collectivite = None, + numero_demande = '45641531', + numero_dossier = FAKE_NUMERO_DOSSIER, + type_fichier = 'CERFA', + orig_filename = os.path.basename(TEST_FILE_CERFA_DIA), + content_type = 'application/pdf', + file_hash = 'ffdf456fdsvgb4bgfb6g4f5b', + upload_file = File(open(TEST_FILE_CERFA_DIA, 'r')), + upload_status = 'pending' + ) + + form_with_instance = ForwardFileForm(instance=ff, collectivite=collectivite_1) + assert form_with_instance.instance is ff + assert form_with_instance.instance.collectivite is collectivite_1 + + form_with_instance = ForwardFileForm(instance=ff, connecteur=atreal_openads) + assert form_with_instance.instance is ff + assert form_with_instance.instance.connecteur is atreal_openads + + # TODO check the queryset of the collectivite + + +def test_collectivite_form(atreal_openads): + form = CollectiviteForm() + assert form.instance is not None + + col = Collectivite( + connecteur = None, + name = u'Ma collectivité', + openADS_id = 3 + ) + + form_with_instance = CollectiviteForm(instance=col, connecteur=atreal_openads) + assert form_with_instance.instance is col + assert form_with_instance.instance.connecteur is atreal_openads + + +def test_guichet_form(atreal_openads, collectivite_1): + form = GuichetForm() + assert form.instance is not None + + gui = Guichet( + collectivite = None, + ouverture_jour_h = datetime.time(9, 0, 0), + fermeture_jour_h = datetime.time(18, 0, 0), + ouverture_sem_d = 1, + fermeture_sem_d = 5, + ouverture_sem_h = datetime.time(10, 30, 0), + fermeture_sem_h = datetime.time(12, 15, 0) + ) + + form_with_instance = GuichetForm(instance=gui, collectivite=collectivite_1) + assert form_with_instance.instance is gui + assert form_with_instance.instance.collectivite is collectivite_1 diff --git a/tests/test_utils.py b/tests/test_utils.py new file mode 100644 index 0000000..56f8ec7 --- /dev/null +++ b/tests/test_utils.py @@ -0,0 +1,288 @@ +# -*- coding: utf-8 -*- + +# to run it use the following command in the 'tests' directory: +# ~> DJANGO_SETTINGS_MODULE=passerelle.settings pytest -W ignore::django.utils.deprecation.RemovedInDjango20Warning test_atreal_openads.py -vv +# +# and with 'coverage': +# ~> DJANGO_SETTINGS_MODULE=passerelle.settings pytest -W ignore::django.utils.deprecation.RemovedInDjango20Warning test_atreal_openads.py -vv --cov=~/src/passerelle/passerelle/apps/atreal_openads + +import pytest +import os +import base64 +import re +import datetime + +from django.core.files import File +from django.core.files.base import ContentFile + +from atreal_openads.utils import ( + to_dash_case, + force_encoded_string_output, + strip_tags, + clean_spaces, + normalize, + get_file_data, + get_file_digest, + get_upload_path, + get_file_extension, + trunc_str_values, + DictDumper +) + +from atreal_openads.models import ( + ForwardFile, + Guichet, + Collectivite, + AtrealOpenads +) + + +CONNECTOR_NAME = 'atreal-openads' +CONNECTOR_SLUG = 'atreal' +COLLECTIVITE = 79 +OPENADS_API_LOGIN = 'publik-passerelle' +OPENADS_API_PASSWORD = base64.urlsafe_b64encode(os.urandom(20)) +OPENADS_API_URL = 'http://openads.api/' + +FAKE_COOKIE_CRSF = base64.urlsafe_b64encode(os.urandom(20)) +FAKE_NUMERO_DOSSIER = base64.urlsafe_b64encode(os.urandom(10)) + +TESTS_DIR = os.path.dirname(__file__) +RESOURCES_DIR = os.path.join(TESTS_DIR, 'resources') +TEST_FILE_CERFA_DIA = os.path.join(RESOURCES_DIR, 'cerfa_10072-02.pdf') +TEST_FILE_PLAN_CADASTRAL = os.path.join(RESOURCES_DIR, 'plancadastral.pdf') + + +@pytest.fixture +def atreal_openads(db): + return AtrealOpenads.objects.create( + slug = CONNECTOR_SLUG, + default_collectivite_openADS_id = COLLECTIVITE, + openADS_API_url = OPENADS_API_URL, + basic_auth_username = OPENADS_API_LOGIN, + basic_auth_password = OPENADS_API_PASSWORD + ) + +@pytest.fixture +def collectivite_1(db, atreal_openads): + return Collectivite.objects.create( + name = u'Macollectivité', + connecteur = atreal_openads, + openADS_id = '3' + ) + +@pytest.fixture +def collectivite_1_guichet(db, atreal_openads, collectivite_1): + return Guichet.objects.create( + collectivite = collectivite_1, + ouverture_jour_h = datetime.time(9, 0), + fermeture_jour_h = datetime.time(17, 0), + ouverture_sem_d = 1, # Lundi + fermeture_sem_d = 5, # Vendredi + ouverture_sem_h = datetime.time(8, 30), + fermeture_sem_h = datetime.time(12, 15) + ) + + +def test_to_dash_case(): + s = 'ACamelCaseName' + assert to_dash_case(s) == 'a-camel-case-name' + + assert to_dash_case('') == '' + + +def test_force_encoded_string_output(): + def a_str_function(): + return str('toto') + ret = force_encoded_string_output(a_str_function)() + assert isinstance(ret, str) + ret = force_encoded_string_output(a_str_function, 'latin1')() + assert isinstance(ret, str) + + def an_unicode_function(): + return u'toto' + ret = force_encoded_string_output(an_unicode_function)() + assert isinstance(ret, str) + ret = force_encoded_string_output(an_unicode_function, 'latin1')() + assert isinstance(ret, str) + + +def test_strip_tags(): + s = 'aaa b cc ' + assert strip_tags(s) == s + + ss = s + 'dd' + assert strip_tags(ss) == s + 'dd' + + ss = s + 'dd' + assert strip_tags(ss) == s + 'dd' + + ss = s + 'dd' + assert strip_tags(ss) == s + 'dd' + + ss = s + ' 1 < 3' + assert strip_tags(ss) == s + ' 1 < 3' + + +def test_clean_spaces(): + s = 'aaa b cc ' + assert clean_spaces(s) == 'aaa b cc' + + s = 'a\ta b\nb c\rc d\\n\\r\\td' + assert clean_spaces(s) == 'a a b b c c d d' + + +def test_normalize(): + assert normalize(None) == '' + + s = 'aaa b cc ' + assert normalize(s) == 'aaa b cc' + + s = 'a\ta b\nb c\rc d\\n\\r\\td' + assert normalize(s) == 'a a b b c c d d' + + +def test_get_file_data(): + assert get_file_data(TEST_FILE_CERFA_DIA) == base64.b64encode(open(TEST_FILE_CERFA_DIA).read()) + assert get_file_data(TEST_FILE_CERFA_DIA, b64=False) == open(TEST_FILE_CERFA_DIA).read() + + +def test_get_file_digest(): + with open(TEST_FILE_CERFA_DIA) as fd: + assert get_file_digest(fd) == 'cc90a620982760fdee16a5b4fe1b5ac3b4fe868fd02d2f70b27f1e46d283ea51' + + +def test_get_upload_path(): + ff = ForwardFile( + numero_demande='45641531', + numero_dossier=FAKE_NUMERO_DOSSIER, + type_fichier='CERFA', + orig_filename=os.path.basename(TEST_FILE_CERFA_DIA), + content_type='application/pdf', + file_hash='ffdf456fdsvgb4bgfb6g4f5b', + upload_file=File(open(TEST_FILE_CERFA_DIA, 'r')), + upload_status='pending', + connecteur=None, + collectivite=None + ) + regex = r"^to_openADS__%s__%s\.pdf$" % ( + '[0-9]{4}-[0-9]{2}-[0-9]{2}_[0-9]{2}h[0-9]{2}m[0-9]{2}s[0-9]+', 'ffdf') + assert re.search(regex, get_upload_path(ff)) + + +def test_get_file_extension(): + assert get_file_extension('afile.pdf') == '.pdf' + assert get_file_extension('afile', 'application/pdf') == '.pdf' + assert get_file_extension('') == '' + assert get_file_extension('afile') == '' + + +def test_trunc_str_values(): + d = {} + assert trunc_str_values(d, 10) == d + d = {'a': '123456789'} + assert trunc_str_values(d, 0) == {'a': u'…'} + d = {'a': '123456789'} + assert trunc_str_values(d, 1) == {'a': u'1…'} + d = {'a': '123456789'} + assert trunc_str_values(d, 2) == {'a': u'12…'} + d = {'a': '123456789'} + assert trunc_str_values(d, 5) == {'a': u'12345…'} + d = {'a': '123456789'} + assert trunc_str_values(d, 8) == {'a': u'12345678…'} + d = {'a': '123456789'} + assert trunc_str_values(d, 9) == {'a': u'123456789'} + d = {'a': '123456789'} + assert trunc_str_values(d, 10) == d + + d = {'a': '123456789', 'b123456789': '987654321'} + assert trunc_str_values(d, 5) == {'a': u'12345…', 'b123456789': u'98765…'} + + d = {'a': '123456789', 'b123456789': '987654321', 'c': {'c1':'ABCDEFGHIJK'}} + assert trunc_str_values(d, 5) == {'a': u'12345…', 'b123456789': u'98765…', 'c': {'c1': u'ABCDE…'}} + + d = {'a': '123456789', 'b123456789': '987654321', 'c': {'c1':'ABCDEFGHIJK'}, 'd': ['123456789']} + assert trunc_str_values(d, 5) == {'a': u'12345…', 'b123456789': u'98765…', 'c': {'c1': u'ABCDE…'}, 'd': [u'12345…']} + + d = {'a': '123456789', 'b123456789': '987654321', 'c': {'c1':'ABCDEFGHIJK'}, 'd': ['123456789', {'eeeeeeeeee':'132456789'}]} + assert trunc_str_values(d, 5) == {'a': u'12345…', 'b123456789': u'98765…', 'c': {'c1': u'ABCDE…'}, 'd': [u'12345…', {'eeeeeeeeee': u'13245…'}]} + + +def test_dict_dumper(): + d = {} + + dd = DictDumper(d, use_json_dumps=False) + assert repr(dd) == (u'DictDumper(dic=%r,max_str_len=%r,use_json_dumps=%r)' % ( + d, dd.max_str_len, dd.use_json_dumps)).encode('utf-8') + assert str(dd) == '{}' + assert unicode(dd) == u'{}' + + assert d == dd.dic + assert unicode(d) == unicode(dd) + dd = DictDumper(d, 0, use_json_dumps=False) + assert d == dd.dic + assert unicode(d) == unicode(dd) + + d = {'a': '123456789'} + dd = DictDumper(d, 10, use_json_dumps=False) + assert d == dd.dic + assert unicode(d) == unicode(dd) + dd = DictDumper(d, 5, use_json_dumps=False) + assert d == dd.dic + assert unicode(dd) == unicode({'a': u'12345…'}) + dd = DictDumper(d, 5, use_json_dumps=True) + assert d == dd.dic + assert unicode(dd) == u'{"a": "12345\\u2026"}' + + +def test_base_model(atreal_openads, collectivite_1, collectivite_1_guichet): + ff = ForwardFile( + numero_demande='45641531', + numero_dossier=FAKE_NUMERO_DOSSIER, + type_fichier='CERFA', + orig_filename=os.path.basename(TEST_FILE_CERFA_DIA), + content_type='application/pdf', + file_hash='ffdf456fdsvgb4bgfb6g4f5b', + upload_file=ContentFile('toto'), + upload_status='pending', + connecteur=atreal_openads, + collectivite=None + ) + + assert ff.get_verbose_name() == 'Forward File' + assert ff.get_verbose_name_plural() == 'Forward Files' + + assert ff.get_class_name() == 'ForwardFile' + assert ff.get_class_name_plural() == 'ForwardFiles' + + assert ff.get_class_name_dash_case() == 'forward-file' + assert ff.get_class_name_plural_dash_case() == 'forward-files' + + assert ff.get_class_name_title() == 'Forward File' + assert ff.get_class_name_plural_title() == 'Forward Files' + + assert ff.get_url_name('list', plural=True) == 'list-forward-files' + + assert ff.get_absolute_url() == '/manage/atreal-openads/atreal/forward-file/None' + assert ff.get_edit_url() == '/manage/atreal-openads/atreal/edit-forward-file/None' + assert ff.get_delete_url() == '/manage/atreal-openads/atreal/delete-forward-file/None' + assert ff.get_list_url() == '/manage/atreal-openads/atreal/forward-files' + + assert atreal_openads.get_class_name_plural() == 'AtrealOpenads' + + assert atreal_openads.get_url_name('view') == 'view-connector' + + params = atreal_openads.get_url_params(True) + assert params['connector'] == 'atreal-openads' + assert params['slug'] == atreal_openads.slug + + with pytest.raises(Exception) as e: + atreal_openads.get_list_url() + assert unicode(e.value) == u"AtrealOpenads:get_list_url() method should not be called" + + # TODO add more collectivite test cases + + with pytest.raises(Exception) as e: + collectivite_1_guichet.get_list_url() + assert unicode(e.value) == u"Guichet:get_list_url() method should not be called" + diff --git a/tests/test_views.py b/tests/test_views.py new file mode 100644 index 0000000..be170a2 --- /dev/null +++ b/tests/test_views.py @@ -0,0 +1,413 @@ +# -*- coding: utf-8 -*- + +import pytest +import os +import base64 +import datetime + +from django.http.request import HttpRequest, QueryDict +from django.urls.base import resolve +from django.core.files import File + +from atreal_openads.views import ( + get_connecteur_from_request, + get_collectivite_from_request, + AtrealOpenadsView, + ForwardFileView, + ForwardFileListView, + ForwardFileUpdateView, + ForwardFileDeleteView, + CollectiviteView, + CollectiviteListView, + CollectiviteCreateView, + CollectiviteUpdateView, + CollectiviteDeleteView, + GuichetView, + GuichetCreateView, + GuichetUpdateView, + GuichetDeleteView +) + +from atreal_openads.models import ( + ForwardFile, + Guichet, + Collectivite, + AtrealOpenads +) + + +CONNECTOR_NAME = 'atreal-openads' +CONNECTOR_SLUG = 'atreal' +COLLECTIVITE = 79 +OPENADS_API_LOGIN = 'publik-passerelle' +OPENADS_API_PASSWORD = base64.urlsafe_b64encode(os.urandom(20)) +OPENADS_API_URL = 'http://openads.api/' + +FAKE_COOKIE_CRSF = base64.urlsafe_b64encode(os.urandom(20)) +FAKE_NUMERO_DOSSIER = base64.urlsafe_b64encode(os.urandom(10)) + +TESTS_DIR = os.path.dirname(__file__) +RESOURCES_DIR = os.path.join(TESTS_DIR, 'resources') +TEST_FILE_CERFA_DIA = os.path.join(RESOURCES_DIR, 'cerfa_10072-02.pdf') +TEST_FILE_PLAN_CADASTRAL = os.path.join(RESOURCES_DIR, 'plancadastral.pdf') + + +@pytest.fixture +def atreal_openads(db): + return AtrealOpenads.objects.create( + slug = CONNECTOR_SLUG, + default_collectivite_openADS_id = COLLECTIVITE, + openADS_API_url = OPENADS_API_URL, + basic_auth_username = OPENADS_API_LOGIN, + basic_auth_password = OPENADS_API_PASSWORD + ) + +@pytest.fixture +def collectivite_1(db, atreal_openads): + return Collectivite.objects.create( + name = u'Macollectivité', + connecteur = atreal_openads, + openADS_id = '3' + ) + +@pytest.fixture +def collectivite_1_guichet(db, atreal_openads, collectivite_1): + return Guichet.objects.create( + collectivite = collectivite_1, + ouverture_jour_h = datetime.time(9, 0), + fermeture_jour_h = datetime.time(17, 0), + ouverture_sem_d = 1, # Lundi + fermeture_sem_d = 5, # Vendredi + ouverture_sem_h = datetime.time(8, 30), + fermeture_sem_h = datetime.time(12, 15) + ) + +@pytest.fixture +def forwardfile_1(db, atreal_openads, collectivite_1): + return ForwardFile.objects.create( + connecteur = atreal_openads, + collectivite = collectivite_1, + numero_demande = '45641531', + numero_dossier = FAKE_NUMERO_DOSSIER, + type_fichier = 'CERFA', + orig_filename = os.path.basename(TEST_FILE_CERFA_DIA), + content_type = 'application/pdf', + file_hash = 'ffdf456fdsvgb4bgfb6g4f5b', + upload_file = File(open(TEST_FILE_CERFA_DIA, 'r')), + upload_status = 'pending' + ) + + +def test_get_connecteur_from_request(atreal_openads, forwardfile_1): + req = HttpRequest() + req.path = '/manage/atreal-openads/%s/forward-file/%s' % ( + atreal_openads.slug, forwardfile_1.id) + req.method = 'GET' + req.encoding = 'utf-8' + req.GET = QueryDict(mutable=True) # required because of encoding setter + req.POST = QueryDict(mutable=True) # required because of encoding setter + req.content_params = None + req.COOKIES = {} + req.META = {} + req._read_started = False + req.resolver_match = resolve(req.path) + + view = ForwardFileView() + view.request = req + + connecteur = get_connecteur_from_request(view) + assert connecteur is not None + assert connecteur.slug == atreal_openads.slug + + +def test_get_collectivite_from_request(atreal_openads, collectivite_1): + req = HttpRequest() + req.path = '/manage/atreal-openads/%s/collectivite/%s/forward-files' % ( + atreal_openads.slug, collectivite_1.id) + req.method = 'GET' + req.encoding = 'utf-8' + req.GET = QueryDict(mutable=True) # required because of encoding setter + req.POST = QueryDict(mutable=True) # required because of encoding setter + req.content_params = None + req.COOKIES = {} + req.META = {} + req._read_started = False + req.resolver_match = resolve(req.path) + + view = ForwardFileListView() + view.request = req + + collectivite = get_collectivite_from_request(view) + assert collectivite is not None + assert collectivite.id == collectivite_1.id + + +def test_forwardfile_view(atreal_openads, collectivite_1, forwardfile_1): + req = HttpRequest() + req.path = '/manage/atreal-openads/%s/forward-file/%s' % ( + atreal_openads.slug, forwardfile_1.id) + req.method = 'GET' + req.encoding = 'utf-8' + req.GET = QueryDict(mutable=True) # required because of encoding setter + req.POST = QueryDict(mutable=True) # required because of encoding setter + req.content_params = None + req.COOKIES = {} + req.META = {} + req._read_started = False + req.resolver_match = resolve(req.path) + + view = ForwardFileView() + view.request = req + view.object = None + view.kwargs = req.resolver_match.kwargs + context = view.get_context_data() + assert context['connecteur'].slug == atreal_openads.slug + + view = ForwardFileUpdateView() + view.request = req + view.object = None + view.kwargs = req.resolver_match.kwargs + context = view.get_context_data() + assert context['connecteur'].slug == atreal_openads.slug + url = view.get_success_url() + assert url == u'/manage/atreal-openads/%s/forward-file/%s' % ( + atreal_openads.slug, forwardfile_1.id) + req.GET['back-to'] = 'list-forward-files' + url = view.get_success_url() + assert url == u'/manage/atreal-openads/%s/forward-files' % atreal_openads.slug + req.GET['back-to'] = 'col-list-forward-files' + url = view.get_success_url() + assert url == u'/manage/atreal-openads/%s/collectivite/%s/forward-files' % ( + atreal_openads.slug, collectivite_1.id) + + view = ForwardFileDeleteView() + view.request = req + view.object = None + view.kwargs = req.resolver_match.kwargs + context = view.get_context_data() + assert context['connecteur'].slug == atreal_openads.slug + del(req.GET['back-to']) + url = view.get_success_url() + assert url == u'/atreal-openads/%s/' % atreal_openads.slug + req.GET['back-to'] = 'list-forward-files' + url = view.get_success_url() + assert url == u'/manage/atreal-openads/%s/forward-files' % atreal_openads.slug + req.GET['back-to'] = 'col-list-forward-files' + url = view.get_success_url() + assert url == u'/manage/atreal-openads/%s/collectivite/%s/forward-files' % ( + atreal_openads.slug, collectivite_1.id) + + req.path = '/manage/atreal-openads/%s/collectivite/%s/forward-files' % ( + atreal_openads.slug, collectivite_1.id) + req.resolver_match = resolve(req.path) + view = ForwardFileListView() + view.request = req + view.object_list = [] + view.kwargs = req.resolver_match.kwargs + context = view.get_context_data() + assert context['connecteur'].slug == atreal_openads.slug + assert context['collectivite'].id == collectivite_1.id + + qs = view.get_queryset() + assert qs.query is not None + assert qs.query.order_by == ['id'] + assert qs.query.default_ordering == True + assert qs.query.get_meta().ordering == ['-last_update_datetime'] + assert qs.ordered + + req.GET['order-by'] = '-id' + qs = view.get_queryset() + assert qs.query is not None + assert qs.query.order_by == ['-id'] + assert qs.query.default_ordering == True + + req.path = '/manage/atreal-openads/%s/forward-files' % atreal_openads.slug + req.resolver_match = resolve(req.path) + del(req.GET['back-to']) + del(req.GET['order-by']) + view = ForwardFileListView() + view.request = req + view.object_list = [] + view.kwargs = req.resolver_match.kwargs + context = view.get_context_data() + assert context['connecteur'].slug == atreal_openads.slug + + qs = view.get_queryset() + assert qs.query is not None + assert qs.query.order_by == ['id'] + assert qs.query.default_ordering == True + assert qs.query.get_meta().ordering == ['-last_update_datetime'] + assert qs.ordered + + +def test_collectivite_view(atreal_openads, collectivite_1, forwardfile_1): + req = HttpRequest() + req.path = '/manage/atreal-openads/%s/collectivite/%s' % ( + atreal_openads.slug, collectivite_1.id) + req.method = 'GET' + req.encoding = 'utf-8' + req.GET = QueryDict(mutable=True) # required because of encoding setter + req.POST = QueryDict(mutable=True) # required because of encoding setter + req.content_params = None + req.COOKIES = {} + req.META = {} + req._read_started = False + req.resolver_match = resolve(req.path) + + view = CollectiviteView() + view.request = req + view.object = None + view.kwargs = req.resolver_match.kwargs + context = view.get_context_data() + assert context['connecteur'].slug == atreal_openads.slug + assert context['guichet_add_url'] == u'/manage/atreal-openads/%s/collectivite/%s/create-guichet' % ( + atreal_openads.slug, collectivite_1.id) + assert context['forward_files_list_url'] == u'/manage/atreal-openads/%s/collectivite/%s/forward-files' % ( + atreal_openads.slug, collectivite_1.id) + + view = CollectiviteUpdateView() + view.request = req + view.object = None + view.kwargs = req.resolver_match.kwargs + context = view.get_context_data() + assert context['connecteur'].slug == atreal_openads.slug + url = view.get_success_url() + assert url == u'/manage/atreal-openads/%s/collectivite/%s' % ( + atreal_openads.slug, collectivite_1.id) + req.GET['back-to'] = 'list-collectivites' + url = view.get_success_url() + assert url == u'/manage/atreal-openads/%s/collectivites' % atreal_openads.slug + + view = CollectiviteDeleteView() + view.request = req + view.object = None + view.kwargs = req.resolver_match.kwargs + context = view.get_context_data() + assert context['connecteur'].slug == atreal_openads.slug + del(req.GET['back-to']) + url = view.get_success_url() + assert url == u'/atreal-openads/%s/' % atreal_openads.slug + req.GET['back-to'] = 'list-collectivites' + url = view.get_success_url() + assert url == u'/manage/atreal-openads/%s/collectivites' % atreal_openads.slug + + view = CollectiviteCreateView() + req.path = '/manage/atreal-openads/%s/create-collectivite' % atreal_openads.slug + req.resolver_match = resolve(req.path) + view.request = req + view.object = None + view.kwargs = req.resolver_match.kwargs + context = view.get_context_data() + assert context['connecteur'].slug == atreal_openads.slug + kwargs = view.get_form_kwargs() + assert kwargs['connecteur'].slug == atreal_openads.slug + del(req.GET['back-to']) + url = view.get_success_url() + assert url == u'/atreal-openads/%s/' % atreal_openads.slug + req.GET['back-to'] = 'list-collectivites' + url = view.get_success_url() + assert url == u'/manage/atreal-openads/%s/collectivites' % atreal_openads.slug + + req.path = '/manage/atreal-openads/%s/collectivites' % atreal_openads.slug + req.resolver_match = resolve(req.path) + view = CollectiviteListView() + view.request = req + view.object_list = [] + view.kwargs = req.resolver_match.kwargs + context = view.get_context_data() + assert context['connecteur'].slug == atreal_openads.slug + assert context['collectivite_add_url'] == u'/manage/atreal-openads/%s/create-collectivite' % atreal_openads.slug + + qs = view.get_queryset() + assert qs.query is not None + assert qs.query.order_by == ['id'] + assert qs.query.default_ordering == True + assert qs.query.get_meta().ordering == ['name'] + assert qs.ordered + + req.GET['order-by'] = '-id' + qs = view.get_queryset() + assert qs.query is not None + assert qs.query.order_by == ['-id'] + assert qs.query.default_ordering == True + + +def test_guichet_view(atreal_openads, collectivite_1, collectivite_1_guichet): + req = HttpRequest() + req.path = '/manage/atreal-openads/%s/collectivite/%s/guichet/%s' % ( + atreal_openads.slug, collectivite_1.id, collectivite_1_guichet.id) + req.method = 'GET' + req.encoding = 'utf-8' + req.GET = QueryDict(mutable=True) # required because of encoding setter + req.POST = QueryDict(mutable=True) # required because of encoding setter + req.content_params = None + req.COOKIES = {} + req.META = {} + req._read_started = False + req.resolver_match = resolve(req.path) + + view = GuichetView() + view.request = req + view.object = None + view.kwargs = req.resolver_match.kwargs + context = view.get_context_data() + assert context['connecteur'].slug == atreal_openads.slug + assert context['collectivite'].id == collectivite_1.id + + view = GuichetUpdateView() + view.request = req + view.object = None + view.kwargs = req.resolver_match.kwargs + context = view.get_context_data() + assert context['connecteur'].slug == atreal_openads.slug + assert context['collectivite'].id == collectivite_1.id + + view = GuichetDeleteView() + view.request = req + view.object = None + view.kwargs = req.resolver_match.kwargs + context = view.get_context_data() + assert context['connecteur'].slug == atreal_openads.slug + assert context['collectivite'].id == collectivite_1.id + url = view.get_success_url() + assert url == u'/manage/atreal-openads/%s/collectivite/%s' % ( + atreal_openads.slug, collectivite_1.id) + + view = GuichetCreateView() + req.path = '/manage/atreal-openads/%s/collectivite/%s/create-guichet' % ( + atreal_openads.slug, collectivite_1.id) + req.resolver_match = resolve(req.path) + view.request = req + view.object = None + view.kwargs = req.resolver_match.kwargs + context = view.get_context_data() + assert context['connecteur'].slug == atreal_openads.slug + assert context['collectivite'].id == collectivite_1.id + kwargs = view.get_form_kwargs() + assert kwargs['collectivite'].id == collectivite_1.id + url = view.get_success_url() + assert url == u'/manage/atreal-openads/%s/collectivite/%s' % ( + atreal_openads.slug, collectivite_1.id) + + +def test_connecteur_view(atreal_openads): + req = HttpRequest() + req.path = '/atreal-openads/%s/' % atreal_openads.slug + req.method = 'GET' + req.encoding = 'utf-8' + req.GET = QueryDict(mutable=True) # required because of encoding setter + req.POST = QueryDict(mutable=True) # required because of encoding setter + req.content_params = None + req.COOKIES = {} + req.META = {} + req._read_started = False + req.resolver_match = resolve(req.path) + + view = AtrealOpenadsView() + view.request = req + view.object = atreal_openads + view.kwargs = req.resolver_match.kwargs + context = view.get_context_data() + assert context['collectivite_fields'] == Collectivite.get_fields() + assert context['collectivite_add_url'] == u'/manage/atreal-openads/%s/create-collectivite' % atreal_openads.slug +