Refactoring: fix most of the (many many) 'pylint' complaints

This commit is contained in:
Michael Bideau 2019-08-22 14:44:52 +00:00
parent 2786bc29e7
commit afcca49f53
11 changed files with 1953 additions and 1203 deletions

408
.pylintrc Normal file
View File

@ -0,0 +1,408 @@
[MASTER]
# Specify a configuration file.
#rcfile=
# Python code to execute, usually for sys.path manipulation such as
# pygtk.require().
#init-hook=
# Add files or directories to the blacklist. They should be base names, not
# paths.
ignore=CVS
# Add files or directories matching the regex patterns to the blacklist. The
# regex matches against base names, not paths.
ignore-patterns=
# Pickle collected data for later comparisons.
persistent=yes
# List of plugins (as comma separated values of python modules names) to load,
# usually to register additional checkers.
load-plugins=
# Use multiple processes to speed up Pylint.
jobs=1
# Allow loading of arbitrary C extensions. Extensions are imported into the
# active Python interpreter and may run arbitrary code.
unsafe-load-any-extension=no
# A comma-separated list of package or module names from where C extensions may
# be loaded. Extensions are loading into the active Python interpreter and may
# run arbitrary code
extension-pkg-whitelist=
# Allow optimization of some AST trees. This will activate a peephole AST
# optimizer, which will apply various small optimizations. For instance, it can
# be used to obtain the result of joining multiple strings with the addition
# operator. Joining a lot of strings can lead to a maximum recursion error in
# Pylint and this flag can prevent that. It has one side effect, the resulting
# AST will be different than the one from reality. This option is deprecated
# and it will be removed in Pylint 2.0.
optimize-ast=no
[MESSAGES CONTROL]
# Only show warnings with the listed confidence levels. Leave empty to show
# all. Valid levels: HIGH, INFERENCE, INFERENCE_FAILURE, UNDEFINED
confidence=
# Enable the message, report, category or checker with the given id(s). You can
# either give multiple identifier separated by comma (,) or put this option
# multiple time (only on the command line, not in the configuration file where
# it should appear only once). See also the "--disable" option for examples.
#enable=
# Disable the message, report, category or checker with the given id(s). You
# can either give multiple identifiers separated by comma (,) or put this
# option multiple times (only on the command line, not in the configuration
# file where it should appear only once).You can also use "--disable=all" to
# disable everything first and then reenable specific checks. For example, if
# you want to run only the similarities checker, you can use "--disable=all
# --enable=similarities". If you want to run only the classes checker, but have
# no Warning level messages displayed, use"--disable=all --enable=classes
# --disable=W"
#disable=import-star-module-level,old-octal-literal,oct-method,print-statement,unpacking-in-except,parameter-unpacking,backtick,old-raise-syntax,old-ne-operator,long-suffix,dict-view-method,dict-iter-method,metaclass-assignment,next-method-called,raising-string,indexing-exception,raw_input-builtin,long-builtin,file-builtin,execfile-builtin,coerce-builtin,cmp-builtin,buffer-builtin,basestring-builtin,apply-builtin,filter-builtin-not-iterating,using-cmp-argument,useless-suppression,range-builtin-not-iterating,suppressed-message,no-absolute-import,old-division,cmp-method,reload-builtin,zip-builtin-not-iterating,intern-builtin,unichr-builtin,reduce-builtin,standarderror-builtin,unicode-builtin,xrange-builtin,coerce-method,delslice-method,getslice-method,setslice-method,input-builtin,round-builtin,hex-method,nonzero-method,map-builtin-not-iterating
disable=import-star-module-level,old-octal-literal,oct-method,print-statement,unpacking-in-except,parameter-unpacking,backtick,old-raise-syntax,old-ne-operator,long-suffix,dict-view-method,dict-iter-method,metaclass-assignment,next-method-called,raising-string,indexing-exception,raw_input-builtin,long-builtin,file-builtin,execfile-builtin,coerce-builtin,cmp-builtin,buffer-builtin,basestring-builtin,apply-builtin,filter-builtin-not-iterating,using-cmp-argument,useless-suppression,range-builtin-not-iterating,suppressed-message,no-absolute-import,old-division,cmp-method,reload-builtin,zip-builtin-not-iterating,intern-builtin,unichr-builtin,reduce-builtin,standarderror-builtin,unicode-builtin,xrange-builtin,coerce-method,delslice-method,getslice-method,setslice-method,input-builtin,round-builtin,hex-method,nonzero-method,map-builtin-not-iterating
[REPORTS]
# Set the output format. Available formats are text, parseable, colorized, msvs
# (visual studio) and html. You can also give a reporter class, eg
# mypackage.mymodule.MyReporterClass.
output-format=text
# Put messages in a separate file for each module / package specified on the
# command line instead of printing them on stdout. Reports (if any) will be
# written in a file name "pylint_global.[txt|html]". This option is deprecated
# and it will be removed in Pylint 2.0.
files-output=no
# Tells whether to display a full report or only the messages
reports=yes
# Python expression which should return a note less than 10 (10 is the highest
# note). You have access to the variables errors warning, statement which
# respectively contain the number of errors / warnings messages and the total
# number of statements analyzed. This is used by the global evaluation report
# (RP0004).
evaluation=10.0 - ((float(5 * error + warning + refactor + convention) / statement) * 10)
# Template used to display messages. This is a python new-style format string
# used to format the message information. See doc for all details
#msg-template=
[BASIC]
# Good variable names which should always be accepted, separated by a comma
good-names=i,j,k,ex,Run,_
# Bad variable names which should always be refused, separated by a comma
bad-names=foo,bar,baz,toto,tutu,tata
# Colon-delimited sets of names that determine each other's naming style when
# the name regexes allow several styles.
name-group=
# Include a hint for the correct naming format with invalid-name
include-naming-hint=no
# List of decorators that produce properties, such as abc.abstractproperty. Add
# to this list to register other decorators that produce valid properties.
property-classes=abc.abstractproperty
# Regular expression matching correct function names
function-rgx=[a-z_][a-z0-9_]{2,30}$
# Naming hint for function names
function-name-hint=[a-z_][a-z0-9_]{2,30}$
# Regular expression matching correct variable names
variable-rgx=[a-z_][a-z0-9_]{2,30}$
# Naming hint for variable names
variable-name-hint=[a-z_][a-z0-9_]{2,30}$
# Regular expression matching correct constant names
const-rgx=(([A-Z_][A-Z0-9_]*)|(__.*__))$
# Naming hint for constant names
const-name-hint=(([A-Z_][A-Z0-9_]*)|(__.*__))$
# Regular expression matching correct attribute names
attr-rgx=[a-z_][a-z0-9_]{2,30}$
# Naming hint for attribute names
attr-name-hint=[a-z_][a-z0-9_]{2,30}$
# Regular expression matching correct argument names
argument-rgx=[a-z_][a-z0-9_]{2,30}$
# Naming hint for argument names
argument-name-hint=[a-z_][a-z0-9_]{2,30}$
# Regular expression matching correct class attribute names
class-attribute-rgx=([A-Za-z_][A-Za-z0-9_]{2,30}|(__.*__))$
# Naming hint for class attribute names
class-attribute-name-hint=([A-Za-z_][A-Za-z0-9_]{2,30}|(__.*__))$
# Regular expression matching correct inline iteration names
inlinevar-rgx=[A-Za-z_][A-Za-z0-9_]*$
# Naming hint for inline iteration names
inlinevar-name-hint=[A-Za-z_][A-Za-z0-9_]*$
# Regular expression matching correct class names
class-rgx=[A-Z_][a-zA-Z0-9]+$
# Naming hint for class names
class-name-hint=[A-Z_][a-zA-Z0-9]+$
# Regular expression matching correct module names
module-rgx=(([a-z_][a-z0-9_]*)|([A-Z][a-zA-Z0-9]+))$
# Naming hint for module names
module-name-hint=(([a-z_][a-z0-9_]*)|([A-Z][a-zA-Z0-9]+))$
# Regular expression matching correct method names
method-rgx=[a-z_][a-z0-9_]{2,30}$
# Naming hint for method names
method-name-hint=[a-z_][a-z0-9_]{2,30}$
# Regular expression which should only match function or class names that do
# not require a docstring.
no-docstring-rgx=^_
# Minimum line length for functions/classes that require docstrings, shorter
# ones are exempt.
docstring-min-length=-1
[ELIF]
# Maximum number of nested blocks for function / method body
max-nested-blocks=5
[SIMILARITIES]
# Minimum lines number of a similarity.
min-similarity-lines=4
# Ignore comments when computing similarities.
ignore-comments=yes
# Ignore docstrings when computing similarities.
ignore-docstrings=yes
# Ignore imports when computing similarities.
ignore-imports=no
[TYPECHECK]
# Tells whether missing members accessed in mixin class should be ignored. A
# mixin class is detected if its name ends with "mixin" (case insensitive).
ignore-mixin-members=yes
# List of module names for which member attributes should not be checked
# (useful for modules/projects where namespaces are manipulated during runtime
# and thus existing member attributes cannot be deduced by static analysis. It
# supports qualified module names, as well as Unix pattern matching.
ignored-modules=
# List of class names for which member attributes should not be checked (useful
# for classes with dynamically set attributes). This supports the use of
# qualified names.
ignored-classes=optparse.Values,thread._local,_thread._local
# List of members which are set dynamically and missed by pylint inference
# system, and so shouldn't trigger E1101 when accessed. Python regular
# expressions are accepted.
generated-members=
# List of decorators that produce context managers, such as
# contextlib.contextmanager. Add to this list to register other decorators that
# produce valid context managers.
contextmanager-decorators=contextlib.contextmanager
[VARIABLES]
# Tells whether we should check for unused import in __init__ files.
init-import=no
# A regular expression matching the name of dummy variables (i.e. expectedly
# not used).
dummy-variables-rgx=(_+[a-zA-Z0-9]*?$)|dummy
# List of additional names supposed to be defined in builtins. Remember that
# you should avoid to define new builtins when possible.
additional-builtins=
# List of strings which can identify a callback function by name. A callback
# name must start or end with one of those strings.
callbacks=cb_,_cb
# List of qualified module names which can have objects that can redefine
# builtins.
redefining-builtins-modules=six.moves,future.builtins
[LOGGING]
# Logging modules to check that the string format arguments are in logging
# function parameter format
logging-modules=logging
[FORMAT]
# Maximum number of characters on a single line.
max-line-length=100
# Regexp for a line that is allowed to be longer than the limit.
ignore-long-lines=^\s*(# )?<?https?://\S+>?$
# Allow the body of an if to be on the same line as the test if there is no
# else.
single-line-if-stmt=no
# List of optional constructs for which whitespace checking is disabled. `dict-
# separator` is used to allow tabulation in dicts, etc.: {1 : 1,\n222: 2}.
# `trailing-comma` allows a space between comma and closing bracket: (a, ).
# `empty-line` allows space-only lines.
no-space-check=trailing-comma,dict-separator
# Maximum number of lines in a module
max-module-lines=1000
# String used as indentation unit. This is usually " " (4 spaces) or "\t" (1
# tab).
indent-string=' '
# Number of spaces of indent required inside a hanging or continued line.
indent-after-paren=4
# Expected format of line ending, e.g. empty (any line ending), LF or CRLF.
expected-line-ending-format=
[SPELLING]
# Spelling dictionary name. Available dictionaries: none. To make it working
# install python-enchant package.
spelling-dict=
# List of comma separated words that should not be checked.
spelling-ignore-words=
# A path to a file that contains private dictionary; one word per line.
spelling-private-dict-file=
# Tells whether to store unknown words to indicated private dictionary in
# --spelling-private-dict-file option instead of raising a message.
spelling-store-unknown-words=no
[MISCELLANEOUS]
# List of note tags to take in consideration, separated by a comma.
notes=FIXME,XXX,TODO
[CLASSES]
# List of method names used to declare (i.e. assign) instance attributes.
defining-attr-methods=__init__,__new__,setUp
# List of valid names for the first argument in a class method.
valid-classmethod-first-arg=cls
# List of valid names for the first argument in a metaclass class method.
valid-metaclass-classmethod-first-arg=mcs
# List of member names, which should be excluded from the protected access
# warning.
exclude-protected=_asdict,_fields,_replace,_source,_make
[DESIGN]
# Maximum number of arguments for function / method
max-args=5
# Argument names that match this expression will be ignored. Default to name
# with leading underscore
ignored-argument-names=_.*
# Maximum number of locals for function / method body
max-locals=15
# Maximum number of return / yield for function / method body
max-returns=6
# Maximum number of branch for function / method body
max-branches=12
# Maximum number of statements in function / method body
max-statements=50
# Maximum number of parents for a class (see R0901).
max-parents=7
# Maximum number of attributes for a class (see R0902).
max-attributes=7
# Minimum number of public methods for a class (see R0903).
min-public-methods=2
# Maximum number of public methods for a class (see R0904).
max-public-methods=20
# Maximum number of boolean expressions in a if statement
max-bool-expr=5
[IMPORTS]
# Deprecated modules which should not be used, separated by a comma
deprecated-modules=regsub,TERMIOS,Bastion,rexec
# Create a graph of every (i.e. internal and external) dependencies in the
# given file (report RP0402 must not be disabled)
import-graph=
# Create a graph of external dependencies in the given file (report RP0402 must
# not be disabled)
ext-import-graph=
# Create a graph of internal dependencies in the given file (report RP0402 must
# not be disabled)
int-import-graph=
# Force import order to recognize a module as part of the standard
# compatibility libraries.
known-standard-library=
# Force import order to recognize a module as part of a third party library.
known-third-party=enchant
# Analyse import fallback blocks. This can be used to support both Python 2 and
# 3 compatible code, which means that the block might have code that exists
# only in one or another interpreter, leading to false positives when analysed.
analyse-fallback-blocks=no
[EXCEPTIONS]
# Exceptions that will emit a warning when being caught. Defaults to
# "Exception"
overgeneral-exceptions=Exception

View File

@ -18,12 +18,17 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Forms for each of the Models."""
from django.forms import ModelForm
from .models import ForwardFile, Collectivite, Guichet
class ForwardFileForm(ModelForm):
"""Form for the ForwardFile model."""
class Meta:
# pylint: disable=too-few-public-methods,no-init,old-style-class,missing-docstring
model = ForwardFile
exclude = ['connecteur', 'size', 'file_hash']
@ -34,51 +39,49 @@ class ForwardFileForm(ModelForm):
super(ForwardFileForm, self).__init__(*args, **kwargs)
if (
(not hasattr(self.instance, 'connecteur') or not self.instance.connecteur)
and connecteur
):
if ((not hasattr(self.instance, 'connecteur') or not self.instance.connecteur)
and connecteur):
self.instance.connecteur = connecteur
if (
(not hasattr(self.instance, 'collectivite') or not self.instance.collectivite)
and collectivite
):
if ((not hasattr(self.instance, 'collectivite') or not self.instance.collectivite)
and collectivite):
self.instance.collectivite = collectivite
# only allow to select a 'collectivite' that belongs to the connecteur
if hasattr(self.instance, 'connecteur') and self.instance.connecteur:
self.fields['collectivite'].queryset = Collectivite.objects.filter(connecteur=self.instance.connecteur)
# pylint: disable=no-member
self.fields['collectivite'].queryset = Collectivite.objects.filter(
connecteur=self.instance.connecteur)
# TODO if the status is 'uploading' make everything read-only
class CollectiviteForm(ModelForm):
"""Form for the Collectivite model."""
class Meta:
# pylint: disable=too-few-public-methods,no-init,old-style-class,missing-docstring
model = Collectivite
exclude = ['connecteur']
def __init__(self, *args, **kwargs):
connecteur = kwargs.pop('connecteur', None)
super(CollectiviteForm, self).__init__(*args, **kwargs)
if (
(not hasattr(self.instance, 'connecteur') or not self.instance.connecteur)
and connecteur
):
if ((not hasattr(self.instance, 'connecteur') or not self.instance.connecteur)
and connecteur):
self.instance.connecteur = connecteur
class GuichetForm(ModelForm):
"""Form for the Guichet model."""
class Meta:
# pylint: disable=too-few-public-methods,no-init,old-style-class,missing-docstring
model = Guichet
exclude = ['collectivite']
def __init__(self, *args, **kwargs):
collectivite = kwargs.pop('collectivite', None)
super(GuichetForm, self).__init__(*args, **kwargs)
if (
(not hasattr(self.instance, 'collectivite') or not self.instance.collectivite)
and collectivite
):
if ((not hasattr(self.instance, 'collectivite') or not self.instance.collectivite)
and collectivite):
self.instance.collectivite = collectivite

View File

@ -18,6 +18,8 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""JSON schemas used by endpoints to validate input/ouput."""
# TODO add string limits (maxLength)
@ -48,6 +50,7 @@ JSON_SCHEMA_FILE_B64 = {
JSON_SCHEMA_DATE_FRENCH = {
"type": "string",
# pylint: disable=anomalous-backslash-in-string
"pattern": "^(0?[1-9]|[12][0-9]|3[01])/(0?[1-9]|1[012])/\d{4}$"
}
@ -230,8 +233,8 @@ JSON_SCHEMA_CREATE_DOSSIER_IN = {
"anyOf": [
# if mandataire_qualite == "Un particulier"
# "mandataire_denomination" and "mandataire_raison_sociale" are not required
# and must be null
# "mandataire_denomination" and "mandataire_raison_sociale"
# are not required and must be null
{
"properties": {
"mandataire_qualite": {"const": "Un particulier"},
@ -240,15 +243,18 @@ JSON_SCHEMA_CREATE_DOSSIER_IN = {
}
},
# if mandataire_qualite == "Une personne morale"
# "mandataire_denomination" and "mandataire_raison_sociale" are required
# and must be string
# "mandataire_denomination" and "mandataire_raison_sociale"
# are required and must be string
{
"properties": {
"mandataire_qualite": {"const": "Une personne morale"},
"mandataire_denomination" : {"type": "string"},
"mandataire_raison_sociale": {"type": "string"}
},
"required": ["mandataire_denomination","mandataire_raison_sociale"]
"required": [
"mandataire_denomination",
"mandataire_raison_sociale"
]
}
]
}
@ -367,6 +373,7 @@ JSON_SCHEMA_GET_FWD_FILE_STATUS = {
"description": "The status of a ForwardFile",
"$id" : "#forwardfile-status",
"type": "string",
# pylint: disable=anomalous-backslash-in-string
"pattern": "^\[\w+\] .+ => .+$"
}

View File

@ -18,12 +18,15 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import json
"""Models for this connector module.""" # pylint: disable=too-many-lines
import base64
import urlparse
import datetime
import json
import os
import re
import urlparse
import magic
from django.db import models
@ -60,7 +63,7 @@ from .utils import (
)
class ForwardFile(models.Model, BaseModel):
class ForwardFile(models.Model, BaseModel): # pylint: disable=too-many-instance-attributes
"""Represent a file uploaded by a user, to be forwarded to openADS.API."""
STATUSES = [
@ -92,6 +95,7 @@ class ForwardFile(models.Model, BaseModel):
last_update_datetime = models.DateTimeField(auto_now=True)
class Meta:
# pylint: disable=too-few-public-methods,no-init,old-style-class,missing-docstring
verbose_name = _('Forward File')
indexes = [
models.Index(fields=['connecteur'], name='ff_connecteur_idx'),
@ -111,15 +115,16 @@ class ForwardFile(models.Model, BaseModel):
"""
if not status_codename:
status_codename = self.upload_status
for st in self.STATUSES:
if st[0] == status_codename:
return st[1]
for status in self.STATUSES:
if status[0] == status_codename:
return status[1]
return status_codename
@force_encoded_string_output
def __repr__(self):
return u'ForwardFile(id=%s,connecteur=%s,collectivite=%s,demande=%s,dossier=%s,type=%s,filename=%s,status=%s)' % (
self.id,
return (u'ForwardFile(id=%s,connecteur=%s,collectivite=%s'
',demande=%s,dossier=%s,type=%s,filename=%s,status=%s)') % (
self.id, # pylint: disable=no-member
unicode(self.connecteur) if hasattr(self, 'connecteur') else None,
unicode(self.collectivite) if hasattr(self, 'collectivite') else None,
self.numero_demande, self.numero_dossier,
@ -128,15 +133,16 @@ class ForwardFile(models.Model, BaseModel):
def __unicode__(self):
return u"%s[%s]" % (trunc_str_values(self.orig_filename, 20), self.get_status())
def get_url_params(self, *args, **kwargs):
params = super(ForwardFile, self).get_url_params(*args, **kwargs)
def get_url_params(self, primary_key=True):
params = super(ForwardFile, self).get_url_params(primary_key=primary_key)
params['connecteur'] = self.connecteur.slug if self.connecteur else None
return params
def update_content_type(self, only_if_empty=False):
"""Update the content type from the content of the file."""
if not self.content_type or not only_if_empty:
if self.upload_file and self.upload_file.size:
if self.upload_file and self.upload_file.size: # pylint: disable=no-member
# pylint: disable=no-member
self.content_type = magic.from_buffer(self.upload_file.read(1024), mime=True)
else:
self.content_type = ''
@ -144,23 +150,26 @@ class ForwardFile(models.Model, BaseModel):
def update_file_hash(self, only_if_empty=False):
"""Update the file_hash field from the content of the file."""
if not self.file_hash or not only_if_empty:
if self.upload_file and self.upload_file.size:
if self.upload_file and self.upload_file.size: # pylint: disable=no-member
self.file_hash = get_file_digest(self.upload_file)
else:
self.file_hash = ''
# preprocessing data and validate model before saving
# /!\ Attention: this will not be triggered when doing bulk actions like with QuerySet.update()
# @see: https://docs.djangoproject.com/en/2.2/topics/db/models/#overriding-predefined-model-methods
# /!\ Attention:
# this will not be triggered when doing bulk actions like with QuerySet.update()
# @see: https://docs.djangoproject.com/en/2.2/topics/db/models/
# The note entitled "Overridden model methods are not called on bulk operations"
def save(self, *args, **kwargs):
def save(self, *args, **kwargs): # pylint: disable=arguments-differ
# delete file content (on success)
if self.upload_status == 'success':
# pylint: disable=no-member
if self.upload_file and self.upload_file.size > 0:
# pylint: disable=no-member
self.upload_file.delete()
# else, update metadata
else:
self.size = self.upload_file.size if self.upload_file else 0
self.size = self.upload_file.size if self.upload_file else 0 # pylint: disable=no-member
self.update_file_hash()
self.update_content_type(only_if_empty=True)
# validation (calling self.clean())
@ -170,12 +179,12 @@ class ForwardFile(models.Model, BaseModel):
# check that one the following fields must not be blank/null:
# 'file_hash', 'orig_filename', 'upload_file'
# because if they are all empty we dont have any usefull information about the upload
def clean(self, *args, **kwargs):
def clean(self, *args, **kwargs): # pylint: disable=arguments-differ
ret = super(ForwardFile, self).clean(*args, **kwargs)
if (not self.file_hash
and not self.orig_filename
and (not self.upload_file or not self.upload_file.size)
):
# pylint: disable=no-member
and (not self.upload_file or not self.upload_file.size)):
raise ValidationError(
_("A %(object)s cannot have all the following fields empty: %(fields)s." % {
'object': self.get_verbose_name(),
@ -199,6 +208,7 @@ class Collectivite(models.Model, BaseModel):
# 'forward_files' will be a property provided by the related_name of the foreignKey
class Meta:
# pylint: disable=too-few-public-methods,no-init,old-style-class,missing-docstring
verbose_name = _('Collectivite')
unique_together = ['connecteur', 'openADS_id']
indexes = [
@ -209,17 +219,14 @@ class Collectivite(models.Model, BaseModel):
ordering = ['name']
@classmethod
def get_fields(cls, *args, **kwargs):
def get_fields(cls):
# get_fields() return is immutable, hence the copy
fields = [f for f in super(Collectivite, cls).get_fields(*args, **kwargs)]
fields = [f for f in super(Collectivite, cls).get_fields()]
# moving related fields field at the end of the list
if fields:
rels = []
for rel_name in ['forward_file', 'guichet']:
if (fields[0]
and hasattr(fields[0], 'name')
and fields[0].name == rel_name
):
if (fields[0] and hasattr(fields[0], 'name') and fields[0].name == rel_name):
rels.append(fields.pop(0))
for rel in reversed(rels):
fields.append(rel)
@ -228,31 +235,31 @@ class Collectivite(models.Model, BaseModel):
@force_encoded_string_output
def __repr__(self):
return u'Collectivite(id=%s,name=%s,connecteur=%s,openADS_id=%s,guichet=%s)' % (
self.id, unicode(self.name),
self.id, unicode(self.name), # pylint: disable=no-member
unicode(self.connecteur) if hasattr(self, 'connecteur') else None,
self.openADS_id,
# pylint: disable=no-member
unicode(self.guichet) if hasattr(self, 'guichet') else None)
def __unicode__(self):
return self.name if isinstance(self.name, unicode) else unicode(self.name)
def get_fields_kv(self, *args, **kwargs):
fields = super(Collectivite, self).get_fields_kv(*args, **kwargs)
def get_fields_kv(self):
fields = super(Collectivite, self).get_fields_kv()
# moving related fields field at the end of the list
if fields:
rels = []
for rel_name in ['forward_file', 'guichet']:
if (fields[0] and fields[0][0]
and hasattr(fields[0][0], 'name')
and fields[0][0].name == rel_name
):
and hasattr(fields[0][0], 'name') and fields[0][0].name == rel_name):
rels.append(fields.pop(0))
for rel in reversed(rels):
fields.append(rel)
return fields
def get_url_params(self, *args, **kwargs):
params = super(Collectivite, self).get_url_params(*args, **kwargs)
def get_url_params(self, primary_key=True):
params = super(Collectivite, self).get_url_params(primary_key=primary_key)
# pylint: disable=no-member
params['connecteur'] = self.connecteur.slug if self.connecteur else None
return params
@ -275,12 +282,19 @@ class Guichet(models.Model, BaseModel):
related_name="guichet")
ouverture_jour_h = models.TimeField(_('Hour of opening (each day)'), help_text=_('ex: 08:30'))
fermeture_jour_h = models.TimeField(_('Hour of closing (each day)'), help_text=_('ex: 17:00'))
ouverture_sem_d = models.PositiveIntegerField(_('Day of opening (each week)'), help_text=_('ex: Lundi'), choices=DAYS, default=1)
fermeture_sem_d = models.PositiveIntegerField(_('Day of closing (each week)'), help_text=_('ex: Samedi'), choices=DAYS, default=6)
ouverture_sem_h = models.TimeField(_('Hour of opening (on opening day)'), help_text=_('ex: 08:30'))
fermeture_sem_h = models.TimeField(_('Hour of closing (on closing day)'), help_text=_('ex: 12:15'))
ouverture_sem_d = models.PositiveIntegerField(_('Day of opening (each week)'),
help_text=_('ex: Lundi'),
choices=DAYS, default=1)
fermeture_sem_d = models.PositiveIntegerField(_('Day of closing (each week)'),
help_text=_('ex: Samedi'),
choices=DAYS, default=6)
ouverture_sem_h = models.TimeField(_('Hour of opening (on opening day)'),
help_text=_('ex: 08:30'))
fermeture_sem_h = models.TimeField(_('Hour of closing (on closing day)'),
help_text=_('ex: 12:15'))
class Meta:
# pylint: disable=too-few-public-methods,no-init,old-style-class,missing-docstring
verbose_name = _('Guichet')
verbose_name_plural = _('Guichets')
indexes = [
@ -291,22 +305,28 @@ class Guichet(models.Model, BaseModel):
@force_encoded_string_output
def __repr__(self):
return u'Guichet(id=%s,collectivite=%s,%s)' % (
self.id,
self.id, # pylint: disable=no-member
unicode(self.collectivite) if hasattr(self, 'collectivite') else None,
unicode(self))
def __unicode__(self):
return u'%s %s -> %s %s [%s/%s]' % (
unicode(self.DAYS[self.ouverture_sem_d - 1][1]),
# pylint: disable=no-member
self.ouverture_sem_h.strftime('%H:%M') if self.ouverture_sem_h else None,
unicode(self.DAYS[self.fermeture_sem_d - 1][1]),
# pylint: disable=no-member
self.fermeture_sem_h.strftime('%H:%M') if self.fermeture_sem_h else None,
# pylint: disable=no-member
self.ouverture_jour_h.strftime('%H:%M') if self.ouverture_jour_h else None,
# pylint: disable=no-member
self.fermeture_jour_h.strftime('%H:%M') if self.fermeture_jour_h else None)
def get_url_params(self, *args, **kwargs):
params = super(Guichet, self).get_url_params(*args, **kwargs)
def get_url_params(self, primary_key=True):
params = super(Guichet, self).get_url_params(primary_key=primary_key)
# pylint: disable=no-member
params['collectivite'] = self.collectivite.id if self.collectivite else None
# pylint: disable=no-member
params['connecteur'] = self.collectivite.connecteur.slug if self.collectivite else None
return params
@ -314,27 +334,27 @@ class Guichet(models.Model, BaseModel):
raise Exception(u"Guichet:get_list_url() method should not be called")
# @raise TypeError if argument is not a datetime object
def is_open(self, dt):
def is_open(self, date_t):
""" Return 'True' if the "Guichet" is open, else False."""
if dt:
if not isinstance(dt, datetime.datetime):
raise TypeError(u"is_open() expect a datetime object (not a %s)" % type(dt))
if date_t:
if not isinstance(date_t, datetime.datetime):
raise TypeError(u"is_open() expect a datetime object (not a %s)" % type(date_t))
ouverture_jour_dt = datetime.datetime.combine(dt, self.ouverture_jour_h)
fermeture_jour_dt = datetime.datetime.combine(dt, self.fermeture_jour_h)
day = dt.isoweekday()
ouverture_jour_date_t = datetime.datetime.combine(date_t, self.ouverture_jour_h)
fermeture_jour_date_t = datetime.datetime.combine(date_t, self.fermeture_jour_h)
day = date_t.isoweekday()
return (
# opening day
(day == self.ouverture_sem_d
and dt.time() > self.ouverture_sem_h and dt < fermeture_jour_dt)
and date_t.time() > self.ouverture_sem_h and date_t < fermeture_jour_date_t)
# closing day
or (day == self.fermeture_sem_d
and dt.time() < self.fermeture_sem_h and dt > ouverture_jour_dt)
and date_t.time() < self.fermeture_sem_h and date_t > ouverture_jour_date_t)
# regular days
or (day > self.ouverture_sem_d
and day < self.fermeture_sem_d
and dt > ouverture_jour_dt
and dt < fermeture_jour_dt
and date_t > ouverture_jour_date_t
and date_t < fermeture_jour_date_t
)
)
@ -344,9 +364,11 @@ class Guichet(models.Model, BaseModel):
class AtrealOpenads(BaseResource, HTTPResource, BaseModel):
"""API that proxy/relay communications with/to openADS."""
default_collectivite_openADS_id = models.PositiveIntegerField(_("Default 'collectivite' (identifier in openADS)"),
default_collectivite_openADS_id = models.PositiveIntegerField(
_("Default 'collectivite' (identifier in openADS)"),
help_text=_('ex: 3'), default=0, blank=True)
openADS_API_url = models.URLField(_('openADS API URL'), max_length=255,
openADS_API_url = models.URLField(
_('openADS API URL'), max_length=255,
help_text=_('ex: https://openads.your_domain.net/api/'), default='')
openADS_API_timeout = 3600
@ -358,27 +380,28 @@ class AtrealOpenads(BaseResource, HTTPResource, BaseModel):
category = _('Business Process Connectors')
class Meta:
# pylint: disable=too-few-public-methods,no-init,old-style-class,missing-docstring
verbose_name = _('openADS')
verbose_name_plural = _('openADS')
ordering = ['openADS_API_url']
@classmethod
def get_class_name_plural(cls, *args, **kwargs):
return cls.get_class_name(*args, **kwargs)
def get_class_name_plural(cls):
return cls.get_class_name()
@force_encoded_string_output
def __repr__(self):
return u'AtrealOpenads(id=%s,openADS=%s,login=%s,collectivites=%s,default=%s)' % (
self.id,
self.id, # pylint: disable=no-member
unicode(self.openADS_API_url),
unicode(self.basic_auth_username),
self.collectivites.count(),
self.collectivites.count(), # pylint: disable=no-member
self.default_collectivite_openADS_id)
def __unicode__(self):
return self.slug if isinstance(self.slug, unicode) else unicode(self.slug)
def get_url_name(self, prefix=''):
def get_url_name(self, prefix='', plural=False):
return '%s%s' % (prefix + '-' if prefix else '', 'connector')
def get_url_params(self, primary_key=True):
@ -390,9 +413,10 @@ class AtrealOpenads(BaseResource, HTTPResource, BaseModel):
def get_list_url(self):
raise Exception(u"AtrealOpenads:get_list_url() method should not be called")
def get_collectivite(self, openADS_id):
def get_collectivite(self, openads_id):
"""Return the 'collectivite' matching an openADS id."""
return Collectivite.objects.get(connecteur=self,openADS_id=openADS_id)
# pylint: disable=no-member
return Collectivite.objects.get(connecteur=self, openADS_id=openads_id)
def log_json_payload(self, payload, title='payload', max_str_len=100):
"""Log a json paylod surrounded by dashes and with file content filtered."""
@ -400,7 +424,7 @@ class AtrealOpenads(BaseResource, HTTPResource, BaseModel):
self.logger.debug(u"%s", DictDumper(payload, max_str_len))
self.logger.debug(u"----- %s (end) -----", title)
def get_files_from_json_payload(self, payload, title='payload'):
def get_files_from_payload(self, payload, title='payload'):
"""Return files from a JSON payload with all checks and logging."""
# check the 'files' key
@ -428,7 +452,7 @@ class AtrealOpenads(BaseResource, HTTPResource, BaseModel):
# return the files
return files
def check_file_dict(self, dict_file, title='payload', b64=True):
def check_file_dict(self, dict_file, title='payload', b64=True): # pylint: disable=no-self-use
"""Ensure a file dict has all its required items."""
# key to get the content
@ -457,11 +481,15 @@ class AtrealOpenads(BaseResource, HTTPResource, BaseModel):
('file.filename', title, 'string', type(dict_file['filename'])))
def get_first_file_from_json_payload(self, payload, title='payload', ensure_content=True, b64=True):
def get_first_file_from_payload(self,
payload,
title='payload',
ensure_content=True,
b64=True):
"""Return the first file from a JSON payload with all checks and logging."""
# get all files
files = self.get_files_from_json_payload(payload, title)
# get all files
files = self.get_files_from_payload(payload, title)
# get the first file
first = files[0]
@ -484,6 +512,7 @@ class AtrealOpenads(BaseResource, HTTPResource, BaseModel):
#~ }
#~ }
)
# pylint: disable=unused-argument,arguments-differ,keyword-arg-before-vararg
def check_status(self, request=None, *args, **kwargs):
"""Check avaibility of the openADS.API service."""
url = urlparse.urljoin(self.openADS_API_url, '__api__')
@ -495,7 +524,7 @@ class AtrealOpenads(BaseResource, HTTPResource, BaseModel):
@endpoint(
perm='can_access',
methods=['post'],
pattern='^(?P<type_dossier>\w+)/?$',
pattern='^(?P<type_dossier>\w+)/?$', # pylint: disable=anomalous-backslash-in-string
example_pattern='{type_dossier}/',
parameters={
'type_dossier': {'description': _("Type of 'dossier'"), 'example_value': 'DIA'},
@ -503,13 +532,15 @@ class AtrealOpenads(BaseResource, HTTPResource, BaseModel):
'description': _("Use this collectivite (instead of the default one)"),
'example_value': '3'
},
'now': {'description': _("Datetime (or string formatted to: '%s') against which the 'guichet' is checked for opening" % '%Y-%m-%d %H:%M:%S'), 'example_value': 'DIA'},
'now': {'description': _(("Datetime (or string formatted to: '%s') "
"against which the 'guichet' is checked for opening") % (
'%Y-%m-%d %H:%M:%S')), 'example_value': 'DIA'},
},
post={'description': _("Create an openADS 'dossier'"),
'request_body': {
'schema': {
'application/json': JSON_SCHEMA_CREATE_DOSSIER_IN
}
} # pylint: disable=too-many-statements,too-many-branches,too-many-locals
}
#~ 'response_body': {
#~ 'schema': {
@ -518,7 +549,9 @@ class AtrealOpenads(BaseResource, HTTPResource, BaseModel):
#~ }
}
)
# pylint: disable=unused-argument,keyword-arg-before-vararg
def create_dossier(self, request, type_dossier, collectivite=None, now=None, *args, **kwargs):
"""Create an openADS 'dossier'."""
# loads the request body as JSON content
json_data = json.loads(request.body)
@ -534,7 +567,7 @@ class AtrealOpenads(BaseResource, HTTPResource, BaseModel):
collectivite = self.get_collectivite(collectivite_id)
# no collectivite instance matching that ID
except Collectivite.DoesNotExist:
except Collectivite.DoesNotExist: # pylint: disable=no-member
pass
# a collectivite instance was found
@ -596,12 +629,15 @@ class AtrealOpenads(BaseResource, HTTPResource, BaseModel):
# for each type of demandeur with associated prefix
for key, prefix in prefixes.items():
# "qualité" of the demandeur
# "qualite" of the demandeur
qualite = normalize(json_data['fields']['%squalite' % prefix])
# 'type_personne' of the demandeur
type_personne = 'particulier' if qualite == 'Un particulier' else 'personne_morale'
# get the demandeur informations
demandeur = {
"type_personne": 'particulier' if qualite == 'Un particulier' else 'personne_morale',
"type_personne": type_personne,
"typologie" : 'petitionnaire' if key == 'demandeurs' else 'delegataire',
"nom" : normalize(json_data['fields']['%snom' % prefix]),
"prenom" : normalize(json_data['fields']['%sprenom' % prefix]),
@ -618,18 +654,29 @@ class AtrealOpenads(BaseResource, HTTPResource, BaseModel):
# add fields if the demandeur is not an individual
if qualite != 'Un particulier':
demandeur["raison_sociale"] = normalize(json_data['fields']['%sraison_sociale' % prefix])
demandeur["denomination"] = normalize(json_data['fields']['%sdenomination' % prefix])
self.logger.debug("%s %s => '%s', '%s'", demandeur['prenom'], demandeur['nom'], demandeur['raison_sociale'], demandeur['denomination'])
demandeur["raison_sociale"] = normalize(
json_data['fields']['%sraison_sociale' % prefix])
demandeur["denomination"] = normalize(
json_data['fields']['%sdenomination' % prefix])
self.logger.debug("%s %s => '%s', '%s'",
demandeur['prenom'],
demandeur['nom'],
demandeur['raison_sociale'],
demandeur['denomination'])
# add optional lieu_dit field
if '%slieu_dit' % prefix in json_data['fields'] and json_data['fields']['%slieu_dit' % prefix]:
demandeur["adresse"]["lieu_dit"] = normalize(json_data['fields']['%slieu_dit' % prefix])
if ('%slieu_dit' % prefix in json_data['fields']
and json_data['fields']['%slieu_dit' % prefix]):
demandeur["adresse"]["lieu_dit"] = normalize(
json_data['fields']['%slieu_dit' % prefix])
# add it to the payload
payload[key] = [demandeur]
self.logger.debug(u"Added '%s' to payload: %s %s", key, demandeur['prenom'], demandeur['nom'])
self.logger.debug(u"Added '%s' to payload: %s %s",
key,
demandeur['prenom'],
demandeur['nom'])
# log the payload
self.log_json_payload(payload)
@ -640,12 +687,10 @@ class AtrealOpenads(BaseResource, HTTPResource, BaseModel):
# prepare files that will be forwarded
files = []
for k in file_keys:
if (
k in json_data['fields']
if (k in json_data['fields']
and json_data['fields'][k]
and isinstance(json_data['fields'][k], dict)
and 'content' in json_data['fields'][k]
):
and 'content' in json_data['fields'][k]):
# get the content decoded from base 64
content = base64.b64decode(json_data['fields'][k]['content'])
@ -664,7 +709,9 @@ class AtrealOpenads(BaseResource, HTTPResource, BaseModel):
# check the content type is PDF for file of type CERFA
if k == 'cerfa' and content_type != 'application/pdf':
self.logger.warning("CERFA content type is '%s' instead of '%s'", content_type, 'application/pdf')
self.logger.warning("CERFA content type is '%s' instead of '%s'",
content_type,
'application/pdf')
# get the filename if specified
filename = None
@ -722,14 +769,12 @@ class AtrealOpenads(BaseResource, HTTPResource, BaseModel):
raise APIError(u'No JSON content returned: %r' % response.content[:1000])
# get the recepisse
recepisse = self.get_first_file_from_json_payload(result, title='response')
recepisse = self.get_first_file_from_payload(result, title='response')
# ensure recepisse content type is PDF
if (
'content_type' in recepisse
if ('content_type' in recepisse
and recepisse['content_type']
and recepisse['content_type'] != 'application/pdf'
):
and recepisse['content_type'] != 'application/pdf'):
self.logger.debug(
u"Forcing 'recepisse' content type to '%s' instead of '%s'.",
'application/pdf',
@ -756,31 +801,32 @@ class AtrealOpenads(BaseResource, HTTPResource, BaseModel):
('numero_dossier', 'string', type(numero_dossier)))
numero_dossier = normalize(numero_dossier)
self.logger.debug(u"Numéro dossier: %s", numero_dossier)
self.logger.debug(u"Numero dossier: %s", numero_dossier)
# save files to be forwarded to openADS.API
if files:
file_ids = []
for f in files:
for upfile in files:
rand_id = base64.urlsafe_b64encode(os.urandom(6))
FF = ForwardFile()
FF.connecteur = self
forwardfile = ForwardFile()
forwardfile.connecteur = self
if isinstance(collectivite, Collectivite):
FF.collectivite = collectivite
FF.numero_demande = rand_id
FF.numero_dossier = numero_dossier
forwardfile.collectivite = collectivite
forwardfile.numero_demande = rand_id
forwardfile.numero_dossier = numero_dossier
for k in ['type_fichier', 'orig_filename', 'content_type', 'file_hash']:
setattr(FF, k, f[k])
FF.upload_file.save(FF.orig_filename, f['upload_file'])
FF.upload_status = 'pending'
FF.save()
setattr(forwardfile, k, upfile[k])
# pylint: disable=no-member
forwardfile.upload_file.save(forwardfile.orig_filename, upfile['upload_file'])
forwardfile.upload_status = 'pending'
forwardfile.save()
self.logger.debug(
u"Created ForwardFile '%s' for file '%s' (%s)",
FF.id,
FF.orig_filename,
FF.upload_file.path
forwardfile.id, # pylint: disable=no-member
forwardfile.orig_filename,
forwardfile.upload_file.path # pylint: disable=no-member
)
file_ids.append(FF.id)
file_ids.append(forwardfile.id) # pylint: disable=no-member
job = self.add_job('upload_user_files',
natural_id=numero_dossier,
@ -788,13 +834,11 @@ class AtrealOpenads(BaseResource, HTTPResource, BaseModel):
type_dossier=type_dossier,
numero_dossier=numero_dossier,
file_ids=file_ids)
self.logger.debug(
u"Added a job '%s' for dossier '%s' (%s) with file ids '%s'",
job.id,
self.logger.debug(u"Added a job '%s' for dossier '%s' (%s) with file ids '%s'",
job.id, # pylint: disable=no-member
numero_dossier,
type_dossier,
file_ids
)
file_ids)
# respond with the 'numero_dossier' and the recepisse file
return {
@ -805,11 +849,13 @@ class AtrealOpenads(BaseResource, HTTPResource, BaseModel):
@endpoint(
perm='can_access',
description=_("Get informations about an openADS 'dossier'"),
# pylint: disable=anomalous-backslash-in-string
pattern='^(?P<type_dossier>\w+)/(?P<numero_dossier>\w+)/?$',
example_pattern='{type_dossier}/{numero_dossier}',
parameters={
'type_dossier' : {'description': _("Type of 'dossier'"), 'example_value': 'DIA'},
'numero_dossier': {'description': _("Identifier for 'dossier'"), 'example_value': 'DIA0130551900001'}
'numero_dossier': {'description': _("Identifier for 'dossier'"),
'example_value': 'DIA0130551900001'}
},
#~ get={
#~ 'description': _("Get informations about an openADS 'dossier'"),
@ -820,10 +866,13 @@ class AtrealOpenads(BaseResource, HTTPResource, BaseModel):
#~ }
#~ }
)
# pylint: disable=unused-argument,keyword-arg-before-vararg
def get_dossier(self, request, type_dossier, numero_dossier, *args, **kwargs):
"""Get informations about an openADS 'dossier'."""
# make a request to openADS.API
url = urlparse.urljoin(self.openADS_API_url, '/dossier/%s/%s' % (type_dossier, numero_dossier))
url = urlparse.urljoin(self.openADS_API_url,
'/dossier/%s/%s' % (type_dossier, numero_dossier))
response = self.requests.get(url)
# response is an error
@ -847,11 +896,13 @@ class AtrealOpenads(BaseResource, HTTPResource, BaseModel):
@endpoint(
perm='can_access',
description=_("Get informations about the forwarding of user files to openADS"),
pattern='^(?P<numero_dossier>\w+)/?$',
pattern='^(?P<numero_dossier>\w+)/?$', # pylint: disable=anomalous-backslash-in-string
example_pattern='{numero_dossier}/',
parameters={
'numero_dossier': {'description': _("Identifier for 'dossier'"), 'example_value': 'DIA0130551900001'},
'fichier_id' : {'description': _("File identifier") , 'example_value': '78'}
'numero_dossier': {'description': _("Identifier for 'dossier'"),
'example_value': 'DIA0130551900001'},
'fichier_id' : {'description': _("File identifier"),
'example_value': '78'}
},
#~ get={
#~ 'description': _("Get informations about the forwarding of user files to openADS"),
@ -862,12 +913,15 @@ class AtrealOpenads(BaseResource, HTTPResource, BaseModel):
#~ }
#~ }
)
# pylint: disable=unused-argument,keyword-arg-before-vararg,no-self-use
def get_fwd_files(self, request, numero_dossier, fichier_id=None, *args, **kwargs):
"""Get informations about the forwarding of user files to openADS."""
payload = []
fwd_files = []
# search for all files matching the 'numero_dossier' number
if not fichier_id:
# pylint: disable=no-member
fwd_files = ForwardFile.objects.filter(numero_dossier=numero_dossier)
# search for a single file
@ -877,9 +931,10 @@ class AtrealOpenads(BaseResource, HTTPResource, BaseModel):
except ValueError:
raise APIError('fichier_id must be an integer')
try:
fwd_files = [ForwardFile.objects.get(id=fichier_id)]
except ForwardFile.DoesNotExist:
raise Http404(u"No file matches 'numero_dossier=%s' and 'id=%s'." % (numero_dossier, fichier_id))
fwd_files = [ForwardFile.objects.get(id=fichier_id)] # pylint: disable=no-member
except ForwardFile.DoesNotExist: # pylint: disable=no-member
raise Http404(u"No file matches 'numero_dossier=%s' and 'id=%s'." % (
numero_dossier, fichier_id))
# append each file to the response payload
for fwd_file in fwd_files:
@ -904,11 +959,13 @@ class AtrealOpenads(BaseResource, HTTPResource, BaseModel):
@endpoint(
perm='can_access',
description=_("Get informations about the forwarding of a user file to openADS"),
pattern='^(?P<numero_dossier>\w+)/?$',
pattern='^(?P<numero_dossier>\w+)/?$', # pylint: disable=anomalous-backslash-in-string
example_pattern='{numero_dossier}/',
parameters={
'numero_dossier': {'description': _("Identifier for 'dossier'"), 'example_value': 'DIA0130551900001'},
'fichier_id' : {'description': _("File identifier") , 'example_value': '78'}
'numero_dossier': {'description': _("Identifier for 'dossier'"),
'example_value': 'DIA0130551900001'},
'fichier_id' : {'description': _("File identifier"),
'example_value': '78'}
},
#~ get={
#~ 'description': _("Get informations about the forwarding of a user file to openADS"),
@ -919,7 +976,9 @@ class AtrealOpenads(BaseResource, HTTPResource, BaseModel):
#~ }
#~ }
)
# pylint: disable=unused-argument,keyword-arg-before-vararg
def get_fwd_files_status(self, request, numero_dossier, fichier_id=None, *args, **kwargs):
"""Get informations about the forwarding of a user file to openADS."""
# get all files matching 'numero_dossier' and 'fichier_id'
fwd_files = self.get_fwd_files(request, numero_dossier, fichier_id)
@ -950,12 +1009,15 @@ class AtrealOpenads(BaseResource, HTTPResource, BaseModel):
@endpoint(
perm='can_access',
description=_("Get a 'courrier' from an openADS 'dossier'"),
# pylint: disable=anomalous-backslash-in-string
pattern='^(?P<type_dossier>\w+)/(?P<numero_dossier>\w+)/(?P<lettre_type>\w+)/?$',
example_pattern='{type_dossier}/{numero_dossier}/{lettre_type}',
parameters={
'type_dossier' : {'description': _("Type of 'dossier'"), 'example_value': 'DIA'},
'numero_dossier': {'description': _("Identifier for 'dossier'"), 'example_value': 'DIA0130551900001'},
'lettre_type' : {'description': _("Courrier ID to get"), 'example_value': 'dia_renonciation_preempter'}
'numero_dossier': {'description': _("Identifier for 'dossier'"),
'example_value': 'DIA0130551900001'},
'lettre_type' : {'description': _("Courrier ID to get"),
'example_value': 'dia_renonciation_preempter'}
},
#~ get={
#~ 'description': _("Get a 'courrier' from an openADS 'dossier'"),
@ -966,7 +1028,9 @@ class AtrealOpenads(BaseResource, HTTPResource, BaseModel):
#~ }
#~ }
)
# pylint: disable=unused-argument,keyword-arg-before-vararg
def get_courrier(self, request, type_dossier, numero_dossier, lettre_type, *args, **kwargs):
"""Get a 'courrier' from an openADS 'dossier'."""
# make a request to openADS.API
url = urlparse.urljoin(
@ -990,7 +1054,7 @@ class AtrealOpenads(BaseResource, HTTPResource, BaseModel):
self.log_json_payload(result, 'response')
# get the courrier
courrier = self.get_first_file_from_json_payload(result, title='response')
courrier = self.get_first_file_from_payload(result, title='response')
# decode the courrier from base 64
try:
@ -1001,7 +1065,7 @@ class AtrealOpenads(BaseResource, HTTPResource, BaseModel):
# return the 'courrier' file
return {'courrier': courrier}
def get_response_error(self, response):
def get_response_error(self, response): # pylint: disable=no-self-use
"""Return a error string from an HTTP response."""
try:
# load the response as JSON
@ -1034,13 +1098,17 @@ class AtrealOpenads(BaseResource, HTTPResource, BaseModel):
@endpoint(
perm='can_access',
description=_("Trigger the uploading of user's files to openADS"),
# pylint: disable=anomalous-backslash-in-string
pattern='^(?P<type_dossier>\w+)/(?P<numero_dossier>\w+)/?$',
example_pattern='{type_dossier}/{numero_dossier}',
parameters={
'type_dossier' : {'description': _("Type of 'dossier'"), 'example_value': 'DIA'},
'numero_dossier': {'description': _("Identifier for 'dossier'"), 'example_value': 'DIA0130551900001'},
'file_ids' : {'description': _("List of ForwardFile IDs to upload (coma separated)"), 'example_value': '12,18'}
},
'numero_dossier': {'description': _("Identifier for 'dossier'"),
'example_value': 'DIA0130551900001'},
'file_ids' : {'description': _(("List of ForwardFile IDs to upload "
"(coma separated)")),
'example_value': '12,18'}
}, # pylint: disable=too-many-statements,too-many-branches,too-many-locals
#~ get={
#~ 'description': _("Trigger the uploading of user's files to openADS"),
#~ 'response_body': {
@ -1051,7 +1119,9 @@ class AtrealOpenads(BaseResource, HTTPResource, BaseModel):
#~ }
)
# @raise ForwareFile.DoesNotExist if not found
def upload_user_files(self, request, type_dossier, numero_dossier, file_ids=None, *args, **kwargs):
# pylint: disable=unused-argument,keyword-arg-before-vararg
def upload_user_files(self, request, type_dossier, numero_dossier, file_ids=None,
*args, **kwargs):
"""A Job to forward user uploaded files to openADS."""
payload = []
@ -1071,13 +1141,14 @@ class AtrealOpenads(BaseResource, HTTPResource, BaseModel):
# a list of ForwardFile IDs was specified
if file_ids:
# pylint: disable=no-member
fwd_files = ForwardFile.objects.filter(id__in=file_ids).all()
# check that all ids where found
fwd_files_ids = set([ff.id for ff in fwd_files])
file_ids_diff = [item for item in file_ids if item not in fwd_files_ids]
if file_ids_diff:
raise ForwardFile.DoesNotExist(
raise ForwardFile.DoesNotExist( # pylint: disable=no-member
"The following ForwardFile IDs were not found: %s." % file_ids_diff)
# filter out files not in status 'pending'
@ -1094,7 +1165,7 @@ class AtrealOpenads(BaseResource, HTTPResource, BaseModel):
else:
# process all ForwardFiles of the 'dossier' (in status 'pending')
fwd_files = ForwardFile.objects.filter(
fwd_files = ForwardFile.objects.filter( # pylint: disable=no-member
numero_dossier=numero_dossier,
upload_status='pending'
).all()
@ -1105,7 +1176,9 @@ class AtrealOpenads(BaseResource, HTTPResource, BaseModel):
# add the file content and data to the payload
payload.append({
'filename' : fwd_file.orig_filename + ('.pdf' if fwd_file.orig_filename[-4:] != '.pdf' else ''),
'filename' : '%s%s' % (
fwd_file.orig_filename,
'.pdf' if fwd_file.orig_filename[-4:] != '.pdf' else ''),
'content_type' : fwd_file.content_type,
'b64_content' : base64.b64encode(fwd_file.upload_file.read()),
'file_type' : fwd_file.type_fichier
@ -1129,7 +1202,8 @@ class AtrealOpenads(BaseResource, HTTPResource, BaseModel):
self.log_json_payload(payload, 'payload')
# make the request to openADS.API (with a specific timeout)
url = urlparse.urljoin(self.openADS_API_url, '/dossier/%s/%s/files' % (type_dossier, numero_dossier))
url = urlparse.urljoin(self.openADS_API_url,
'/dossier/%s/%s/files' % (type_dossier, numero_dossier))
response = self.requests.post(
url,
json=payload,
@ -1148,12 +1222,11 @@ class AtrealOpenads(BaseResource, HTTPResource, BaseModel):
fwd_file.save()
# log (warning) the error message
self.logger.warning(
u"upload_user_files() openADS response is not OK (code: %s) for dossier '%s' and files '%s'",
self.logger.warning((u"upload_user_files() openADS response is not OK "
"(code: %s) for dossier '%s' and files '%s'"),
response.status_code,
numero_dossier,
file_ids
)
file_ids)
# respond with APIError
if request:
@ -1172,15 +1245,15 @@ class AtrealOpenads(BaseResource, HTTPResource, BaseModel):
# update every files status as 'failed' and save the error message
for fwd_file in fwd_files:
fwd_file.upload_status = 'failed'
fwd_file.upload_msg = u'No JSON content returned: %r' % response.content[:1000]
fwd_file.upload_msg = u'No JSON content returned: %r' % (
response.content[:1000])
fwd_file.save()
# log (warning) the error message
self.logger.warning(
u"upload_user_files() openADS response is not JSON valid for dossier '%s' and files '%s'",
self.logger.warning((u"upload_user_files() openADS response is not JSON valid "
"for dossier '%s' and files '%s'"),
numero_dossier,
fwd_files
)
fwd_files)
# respond with APIError
if request:
@ -1222,3 +1295,6 @@ class AtrealOpenads(BaseResource, HTTPResource, BaseModel):
# respond with message
if request:
return {'message': 'no file to transfer'}
# return something to please pylint
return True

View File

@ -18,6 +18,8 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""The urls for this connector module and its entities views."""
import re
from django.conf.urls import url
@ -40,10 +42,12 @@ from .views import (
)
# pylint: disable=invalid-name
urlpatterns = [
url(r'^(?P<slug>[\w,-]+)/$', AtrealOpenadsView.as_view(), name='view-connector')
]
# pylint: disable=invalid-name
management_urlpatterns = []
for view in [
@ -59,8 +63,7 @@ for view in [
GuichetView,
GuichetCreateView,
GuichetUpdateView,
GuichetDeleteView
]:
GuichetDeleteView]:
view_class_name = str(view.__name__)
m = re.search(r'^.*(Create|Update|Delete|List)View$', view_class_name)
if m:
@ -72,6 +75,7 @@ for view in [
url_prefix = view_action.replace('update', 'edit') + '-'
regex_base = r'^(?P<connecteur>[\w,-]+)/'
# pylint: disable=anomalous-backslash-in-string
regex_pkey = '/(?P<pk>[\w,-]+)'
url_name = url_prefix + view.model.get_class_name_dash_case()
@ -101,7 +105,7 @@ for view in [
ff_list_regex_url = ForwardFileListView.model.get_class_name_plural_dash_case()
management_urlpatterns += [
url(
r'^(?P<connecteur>[\w,-]+)/collectivite/(?P<collectivite>[\w,-]+)/' + ff_list_regex_url + '$',
r'^(?P<connecteur>[\w,-]+)/collectivite/(?P<collectivite>[\w,-]+)/%s$' % ff_list_regex_url,
ForwardFileListView.as_view(),
name='col-list-' + ff_list_regex_url
)

View File

@ -18,6 +18,8 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Utilities functions."""
import json
import base64
import datetime
@ -33,8 +35,9 @@ from django.urls import reverse_lazy
def to_dash_case(camel_str):
s1 = re.sub('(.)([A-Z][a-z]+)', r'\1-\2', camel_str)
return re.sub('([a-z0-9])([A-Z])', r'\1-\2', s1).lower()
"""Convert a string formatted from camel case to dash case (like snake case with dash)."""
converted = re.sub('(.)([A-Z][a-z]+)', r'\1-\2', camel_str)
return re.sub('([a-z0-9])([A-Z])', r'\1-\2', converted).lower()
# from: https://stackoverflow.com/a/13848698
@ -44,26 +47,27 @@ def force_encoded_string_output(func, default_enc='utf-8'):
def _func(*args, **kwargs):
return func(*args, **kwargs).encode(sys.stdout.encoding or default_enc)
return _func
else:
return func
class MLStripper(HTMLParser):
"""HTML parser that removes html tags."""
# pylint: disable=super-init-not-called
def __init__(self):
self.reset()
self.fed = []
def handle_data(self, d):
self.fed.append(d)
def handle_data(self, data):
self.fed.append(data)
def get_data(self):
"""Get the stripped data as a string."""
return ''.join(self.fed)
def strip_tags(html):
"""Remove html tags from a string."""
s = MLStripper()
s.feed(html)
return s.get_data()
stripper = MLStripper()
stripper.feed(html)
return stripper.get_data()
def clean_spaces(text):
@ -88,10 +92,10 @@ def normalize(value):
def get_file_data(path, b64=True):
"""Return the content of a file as a string, in base64 if specified."""
with open(path, 'r') as f:
with open(path, 'r') as file_pt:
if b64:
return base64.b64encode(f.read())
return f.read()
return base64.b64encode(file_pt.read())
return file_pt.read()
# copy-pasted from 'wcs/qommon/misc.py'
@ -100,6 +104,7 @@ def get_file_digest(content, chunk_size=100000):
digest = hashlib.sha256()
content.seek(0)
def read_chunk():
"""Read 'chunk_size' amount of data from the content."""
return content.read(chunk_size)
for chunk in iter(read_chunk, ''):
digest.update(chunk)
@ -134,6 +139,7 @@ def get_file_extension(filename, mimetype=None):
return file_extension if file_extension else ''
# pylint: disable=invalid-encoded-data
def trunc_str_values(value, limit, visited=None, truncate_text=u''):
"""Truncate a string value (not dict keys) and append a truncate text."""
@ -142,11 +148,11 @@ def trunc_str_values(value, limit, visited=None, truncate_text=u'…'):
if not value in visited:
if isinstance(value, basestring) and len(value) > limit:
value = value[:limit] + truncate_text
elif isinstance(value, dict) or isinstance(value, list) or isinstance(value, tuple):
elif isinstance(value, (dict, list, tuple)):
visited.append(value)
iterator = value.iteritems() if isinstance(value, dict) else enumerate(value)
for k,v in iterator:
value[k] = trunc_str_values(v, limit, visited, truncate_text)
for _key, _value in iterator:
value[_key] = trunc_str_values(_value, limit, visited, truncate_text)
return value
@ -191,40 +197,53 @@ class BaseModel(object):
@classmethod
def get_verbose_name(cls):
"""Return the verbose name of the class (helper for META option)."""
# pylint: disable=no-member
return cls._meta.verbose_name
@classmethod
def get_verbose_name_plural(cls):
"""Return the plural form of the verbose name of the class (helper for META option)."""
# pylint: disable=no-member
return cls._meta.verbose_name_plural
@classmethod
def get_class_name(cls):
"""Return the object class name."""
return cls.__name__
@classmethod
def get_class_name_plural(cls):
"""Return the plural form of the object class name."""
return cls.get_class_name() + 's'
@classmethod
def get_class_name_dash_case(cls):
"""Return the object class name formatted to dash case."""
return to_dash_case(cls.get_class_name())
@classmethod
def get_class_name_plural_dash_case(cls):
"""Return the plural form of the object class name
formatted to dash case.
"""
return to_dash_case(cls.get_class_name_plural())
@classmethod
def get_class_name_title(cls):
"""Return the object class name formatted to 'title' case."""
return cls.get_class_name_dash_case().replace('-', ' ').title()
@classmethod
def get_class_name_plural_title(cls):
"""Return the plural form of the object class name
formatted to 'title' case.
"""
return cls.get_class_name_plural_dash_case().replace('-', ' ').title()
@classmethod
def get_fields(cls):
"""Return the fields of the class (helper for META option)."""
# pylint: disable=no-member
return cls._meta.get_fields(include_parents=True, include_hidden=False)
@force_encoded_string_output
@ -234,26 +253,33 @@ class BaseModel(object):
# mainly for the view
def get_fields_kv(self):
"""Return the model's list of field's key value."""
# pylint: disable=no-member
return [(field, getattr(self, field.name, None)) for field in self._meta.get_fields()]
def get_url_name(self, prefix='', plural=False):
"""Return a base name for url for this object."""
class_name_dash_case = self.__class__.get_class_name_dash_case()
if plural:
class_name_dash_case = self.__class__.get_class_name_plural_dash_case()
return '%s%s' % (prefix + '-' if prefix else '', class_name_dash_case)
def get_url_params(self, primary_key=True):
"""Return the parameters for 'reverse()' to build url for this object."""
# pylint: disable=no-member
return {'pk': self.id} if primary_key else {}
def get_absolute_url(self):
"""Return the 'absolute' url for this object."""
return reverse_lazy(self.get_url_name('view'), kwargs=self.get_url_params())
def get_edit_url(self):
"""Return the 'edit' url for this object."""
return reverse_lazy(self.get_url_name('edit'), kwargs=self.get_url_params())
def get_delete_url(self):
"""Return the 'delete' url for this object."""
return reverse_lazy(self.get_url_name('delete'), kwargs=self.get_url_params())
def get_list_url(self):
"""Return the 'list' url for this object."""
return reverse_lazy(self.get_url_name('list', True), kwargs=self.get_url_params(False))

View File

@ -18,6 +18,8 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Views for the models."""
from django.urls import reverse_lazy
from django.views.generic.detail import DetailView
@ -44,21 +46,28 @@ def get_collectivite_from_request(view, key='collectivite'):
if not hasattr(view, 'collectivite') or not view.collectivite and view.request:
collectivite_id = view.request.resolver_match.kwargs.get(key, None)
if collectivite_id:
# pylint: disable=no-member
view.collectivite = Collectivite.objects.get(id=collectivite_id)
return view.collectivite if hasattr(view, 'collectivite') else None
# pylint: disable=too-many-ancestors
class ForwardFileView(DetailView):
"""View to display a ForwardFile."""
model = ForwardFile
template_name = 'atreal_openads/manage/forwardfile_view.html'
def get_context_data(self, *args, **kwargs):
context = super(ForwardFileView, self).get_context_data(*args, **kwargs)
def get_context_data(self, **kwargs):
context = super(ForwardFileView, self).get_context_data(**kwargs)
context['connecteur'] = get_connecteur_from_request(self)
return context
# pylint: disable=too-many-ancestors
class ForwardFileListView(ListView):
"""View to display a list of ForwardFiles."""
model = ForwardFile
template_name = 'atreal_openads/manage/forwardfile_list.html'
paginate_by = 50
@ -86,25 +95,28 @@ class ForwardFileListView(ListView):
return qset.order_by(order_by) if order_by else qset # qset.order_by()
def get_context_data(self, *args, **kwargs):
context = super(ForwardFileListView, self).get_context_data(*args, **kwargs)
def get_context_data(self, **kwargs):
context = super(ForwardFileListView, self).get_context_data(**kwargs)
context['connecteur'] = get_connecteur_from_request(self)
context['collectivite'] = get_collectivite_from_request(self)
return context
# pylint: disable=too-many-ancestors
class ForwardFileUpdateView(UpdateView):
"""View to edit a ForwardFile."""
model = ForwardFile
form_class = ForwardFileForm
template_name = 'atreal_openads/manage/forwardfile_form.html'
def get_context_data(self, *args, **kwargs):
context = super(ForwardFileUpdateView, self).get_context_data(*args, **kwargs)
def get_context_data(self, **kwargs):
context = super(ForwardFileUpdateView, self).get_context_data(**kwargs)
context['connecteur'] = get_connecteur_from_request(self)
context['collectivite'] = get_collectivite_from_request(self)
return context
def get_success_url(self, *args, **kwargs):
def get_success_url(self):
back_to = self.request.GET.get('back-to')
if back_to == 'list-forward-files':
return reverse_lazy('list-forward-files', kwargs={
@ -120,18 +132,21 @@ class ForwardFileUpdateView(UpdateView):
return self.get_object().get_absolute_url()
# pylint: disable=too-many-ancestors
class ForwardFileDeleteView(DeleteView):
"""View to delete a ForwardFile."""
model = ForwardFile
form_class = ForwardFileForm
template_name = 'atreal_openads/manage/forwardfile_form.html'
def get_context_data(self, *args, **kwargs):
context = super(ForwardFileDeleteView, self).get_context_data(*args, **kwargs)
def get_context_data(self, **kwargs):
context = super(ForwardFileDeleteView, self).get_context_data(**kwargs)
context['connecteur'] = get_connecteur_from_request(self)
context['collectivite'] = get_collectivite_from_request(self)
return context
def get_success_url(self, *args, **kwargs):
def get_success_url(self):
back_to = self.request.GET.get('back-to')
if back_to == 'list-forward-files':
return reverse_lazy('list-forward-files', kwargs={
@ -150,12 +165,15 @@ class ForwardFileDeleteView(DeleteView):
})
# pylint: disable=too-many-ancestors
class CollectiviteView(DetailView):
"""View to display a Collectivite."""
model = Collectivite
template_name = 'atreal_openads/manage/collectivite_view.html'
def get_context_data(self, *args, **kwargs):
context = super(CollectiviteView, self).get_context_data(*args, **kwargs)
def get_context_data(self, **kwargs):
context = super(CollectiviteView, self).get_context_data(**kwargs)
context['connecteur'] = get_connecteur_from_request(self)
context['guichet_add_url'] = reverse_lazy('create-guichet', kwargs={
'connecteur' : context['connecteur'].slug,
@ -166,7 +184,10 @@ class CollectiviteView(DetailView):
return context
# pylint: disable=too-many-ancestors
class CollectiviteListView(ListView):
"""View to display a list of Collectivites."""
model = Collectivite
template_name = 'atreal_openads/manage/collectivite_list.html'
paginate_by = 50
@ -186,21 +207,24 @@ class CollectiviteListView(ListView):
return qset.order_by(order_by) if order_by else qset # qset.order_by()
def get_context_data(self, *args, **kwargs):
context = super(CollectiviteListView, self).get_context_data(*args, **kwargs)
def get_context_data(self, **kwargs):
context = super(CollectiviteListView, self).get_context_data(**kwargs)
context['connecteur'] = get_connecteur_from_request(self)
context['collectivite_add_url'] = reverse_lazy('create-collectivite', kwargs={
'connecteur': context['connecteur'].slug})
return context
# pylint: disable=too-many-ancestors
class CollectiviteCreateView(CreateView):
"""View to create a Collectivite."""
model = Collectivite
form_class = CollectiviteForm
template_name = 'atreal_openads/manage/collectivite_form.html'
def get_context_data(self, *args, **kwargs):
context = super(CollectiviteCreateView, self).get_context_data(*args, **kwargs)
def get_context_data(self, **kwargs):
context = super(CollectiviteCreateView, self).get_context_data(**kwargs)
context['connecteur'] = get_connecteur_from_request(self)
return context
@ -209,7 +233,7 @@ class CollectiviteCreateView(CreateView):
kwargs['connecteur'] = get_connecteur_from_request(self)
return kwargs
def get_success_url(self, *args, **kwargs):
def get_success_url(self):
if self.request.GET.get('back-to') == 'list-collectivites':
return reverse_lazy('list-collectivites', kwargs={
'connecteur' : get_connecteur_from_request(self).slug
@ -220,17 +244,20 @@ class CollectiviteCreateView(CreateView):
})
# pylint: disable=too-many-ancestors
class CollectiviteUpdateView(UpdateView):
"""View to edit a Collectivite."""
model = Collectivite
form_class = CollectiviteForm
template_name = 'atreal_openads/manage/collectivite_form.html'
def get_context_data(self, *args, **kwargs):
context = super(CollectiviteUpdateView, self).get_context_data(*args, **kwargs)
def get_context_data(self, **kwargs):
context = super(CollectiviteUpdateView, self).get_context_data(**kwargs)
context['connecteur'] = get_connecteur_from_request(self)
return context
def get_success_url(self, *args, **kwargs):
def get_success_url(self):
if self.request.GET.get('back-to') == 'list-collectivites':
return reverse_lazy('list-collectivites', kwargs={
'connecteur' : get_connecteur_from_request(self).slug
@ -238,17 +265,20 @@ class CollectiviteUpdateView(UpdateView):
return self.get_object().get_absolute_url()
# pylint: disable=too-many-ancestors
class CollectiviteDeleteView(DeleteView):
"""View to delete a Collectivite."""
model = Collectivite
form_class = CollectiviteForm
template_name = 'atreal_openads/manage/collectivite_form.html'
def get_context_data(self, *args, **kwargs):
context = super(CollectiviteDeleteView, self).get_context_data(*args, **kwargs)
def get_context_data(self, **kwargs):
context = super(CollectiviteDeleteView, self).get_context_data(**kwargs)
context['connecteur'] = get_connecteur_from_request(self)
return context
def get_success_url(self, *args, **kwargs):
def get_success_url(self):
if self.request.GET.get('back-to') == 'list-collectivites':
return reverse_lazy('list-collectivites', kwargs={
'connecteur' : get_connecteur_from_request(self).slug
@ -259,18 +289,24 @@ class CollectiviteDeleteView(DeleteView):
})
# pylint: disable=too-many-ancestors
class GuichetView(DetailView):
"""View to display a Guichet."""
model = Guichet
template_name = 'atreal_openads/manage/guichet_view.html'
def get_context_data(self, *args, **kwargs):
context = super(GuichetView, self).get_context_data(*args, **kwargs)
def get_context_data(self, **kwargs):
context = super(GuichetView, self).get_context_data(**kwargs)
context['connecteur'] = get_connecteur_from_request(self)
context['collectivite'] = get_collectivite_from_request(self)
return context
# pylint: disable=too-many-ancestors
class GuichetCreateView(CreateView):
"""View to create a Guichet."""
model = Guichet
form_class = GuichetForm
template_name = 'atreal_openads/manage/guichet_form.html'
@ -280,8 +316,8 @@ class GuichetCreateView(CreateView):
kwargs['collectivite'] = get_collectivite_from_request(self)
return kwargs
def get_context_data(self, *args, **kwargs):
context = super(GuichetCreateView, self).get_context_data(*args, **kwargs)
def get_context_data(self, **kwargs):
context = super(GuichetCreateView, self).get_context_data(**kwargs)
context['connecteur'] = get_connecteur_from_request(self)
context['collectivite'] = get_collectivite_from_request(self)
return context
@ -293,44 +329,52 @@ class GuichetCreateView(CreateView):
})
# pylint: disable=too-many-ancestors
class GuichetUpdateView(UpdateView):
"""View to edit a Guichet."""
model = Guichet
form_class = GuichetForm
template_name = 'atreal_openads/manage/guichet_form.html'
def get_context_data(self, *args, **kwargs):
context = super(GuichetUpdateView, self).get_context_data(*args, **kwargs)
def get_context_data(self, **kwargs):
context = super(GuichetUpdateView, self).get_context_data(**kwargs)
context['connecteur'] = get_connecteur_from_request(self)
context['collectivite'] = get_collectivite_from_request(self)
return context
# pylint: disable=too-many-ancestors
class GuichetDeleteView(DeleteView):
"""View to delete a Guichet."""
model = Guichet
form_class = GuichetForm
template_name = 'atreal_openads/manage/guichet_form.html'
def get_context_data(self, *args, **kwargs):
context = super(GuichetDeleteView, self).get_context_data(*args, **kwargs)
def get_context_data(self, **kwargs):
context = super(GuichetDeleteView, self).get_context_data(**kwargs)
context['connecteur'] = get_connecteur_from_request(self)
context['collectivite'] = get_collectivite_from_request(self)
return context
def get_success_url(self, *args, **kwargs):
def get_success_url(self):
return reverse_lazy('view-collectivite', kwargs={
'connecteur': get_connecteur_from_request(self).slug,
'pk' : get_collectivite_from_request(self).id
})
# pylint: disable=too-many-ancestors
class AtrealOpenadsView(GenericConnectorView):
"""View to display a connector AtrealOpenads."""
model = AtrealOpenads
template_name = 'atreal_openads/manage/connector_view.html'
def get_context_data(self, *args, **kwargs):
context = super(AtrealOpenadsView, self).get_context_data(*args, **kwargs)
def get_context_data(self, slug=None, **kwargs):
context = super(AtrealOpenadsView, self).get_context_data(slug=slug, **kwargs)
context['collectivite_fields'] = Collectivite.get_fields()
context['collectivite_add_url'] = reverse_lazy('create-collectivite', kwargs={
'connecteur': self.get_object().slug})
return context

File diff suppressed because it is too large Load Diff

View File

@ -1,10 +1,13 @@
# -*- coding: utf-8 -*-
import pytest
"""Testing forms."""
import os
import base64
import datetime
import pytest
from django.core.files import File
from atreal_openads.forms import (
@ -38,7 +41,9 @@ TEST_FILE_PLAN_CADASTRAL = os.path.join(RESOURCES_DIR, 'plancadastral.pdf')
@pytest.fixture
# pylint: disable=unused-argument,invalid-name
def atreal_openads(db):
"""Return an instance of a connector AtrealOpenads."""
return AtrealOpenads.objects.create(
slug=CONNECTOR_SLUG,
default_collectivite_openADS_id=COLLECTIVITE,
@ -48,16 +53,20 @@ def atreal_openads(db):
)
@pytest.fixture
# pylint: disable=unused-argument,redefined-outer-name,invalid-name
def collectivite_1(db, atreal_openads):
return Collectivite.objects.create(
"""Return an instance of a 'Collectivite'."""
return Collectivite.objects.create( # pylint: disable=no-member
name=u'Macollectivité',
connecteur=atreal_openads,
openADS_id='3'
)
@pytest.fixture
# pylint: disable=unused-argument,redefined-outer-name,invalid-name
def collectivite_1_guichet(db, atreal_openads, collectivite_1):
return Guichet.objects.create(
"""Return an instance of a 'Guichet'."""
return Guichet.objects.create( # pylint: disable=no-member
collectivite=collectivite_1,
ouverture_jour_h=datetime.time(9, 0),
fermeture_jour_h=datetime.time(17, 0),
@ -68,11 +77,14 @@ def collectivite_1_guichet(db, atreal_openads, collectivite_1):
)
# pylint: disable=unused-argument,redefined-outer-name
def test_forwardfile_form(atreal_openads, collectivite_1):
"""Test for ForwardFileForm."""
form = ForwardFileForm()
assert form.instance is not None
ff = ForwardFile(
forwardfile = ForwardFile(
connecteur=None,
collectivite=None,
numero_demande='45641531',
@ -85,18 +97,21 @@ def test_forwardfile_form(atreal_openads, collectivite_1):
upload_status='pending'
)
form_with_instance = ForwardFileForm(instance=ff, collectivite=collectivite_1)
assert form_with_instance.instance is ff
form_with_instance = ForwardFileForm(instance=forwardfile, collectivite=collectivite_1)
assert form_with_instance.instance is forwardfile
assert form_with_instance.instance.collectivite is collectivite_1
form_with_instance = ForwardFileForm(instance=ff, connecteur=atreal_openads)
assert form_with_instance.instance is ff
form_with_instance = ForwardFileForm(instance=forwardfile, connecteur=atreal_openads)
assert form_with_instance.instance is forwardfile
assert form_with_instance.instance.connecteur is atreal_openads
# TODO check the queryset of the collectivite
# pylint: disable=unused-argument,redefined-outer-name
def test_collectivite_form(atreal_openads):
"""Test for CollectiviteForm."""
form = CollectiviteForm()
assert form.instance is not None
@ -111,7 +126,10 @@ def test_collectivite_form(atreal_openads):
assert form_with_instance.instance.connecteur is atreal_openads
# pylint: disable=unused-argument,redefined-outer-name
def test_guichet_form(atreal_openads, collectivite_1):
"""Test for GuichetForm."""
form = GuichetForm()
assert form.instance is not None

View File

@ -1,17 +1,14 @@
# -*- coding: utf-8 -*-
# to run it use the following command in the 'tests' directory:
# ~> DJANGO_SETTINGS_MODULE=passerelle.settings pytest -W ignore::django.utils.deprecation.RemovedInDjango20Warning test_atreal_openads.py -vv
#
# and with 'coverage':
# ~> DJANGO_SETTINGS_MODULE=passerelle.settings pytest -W ignore::django.utils.deprecation.RemovedInDjango20Warning test_atreal_openads.py -vv --cov=~/src/passerelle/passerelle/apps/atreal_openads
"""Testing utilities functions."""
import pytest
import os
import base64
import re
import datetime
import pytest
from django.core.files import File
from django.core.files.base import ContentFile
@ -54,7 +51,9 @@ TEST_FILE_PLAN_CADASTRAL = os.path.join(RESOURCES_DIR, 'plancadastral.pdf')
@pytest.fixture
# pylint: disable=unused-argument,invalid-name
def atreal_openads(db):
"""Return an instance of a connector AtrealOpenads."""
return AtrealOpenads.objects.create(
slug=CONNECTOR_SLUG,
default_collectivite_openADS_id=COLLECTIVITE,
@ -64,16 +63,20 @@ def atreal_openads(db):
)
@pytest.fixture
# pylint: disable=unused-argument,redefined-outer-name,invalid-name
def collectivite_1(db, atreal_openads):
return Collectivite.objects.create(
"""Return an instance of a 'Collectivite'."""
return Collectivite.objects.create( # pylint: disable=no-member
name=u'Macollectivité',
connecteur=atreal_openads,
openADS_id='3'
)
@pytest.fixture
# pylint: disable=unused-argument,redefined-outer-name,invalid-name
def collectivite_1_guichet(db, atreal_openads, collectivite_1):
return Guichet.objects.create(
"""Return an instance of a 'Guichet'."""
return Guichet.objects.create( # pylint: disable=no-member
collectivite=collectivite_1,
ouverture_jour_h=datetime.time(9, 0),
fermeture_jour_h=datetime.time(17, 0),
@ -85,14 +88,19 @@ def collectivite_1_guichet(db, atreal_openads, collectivite_1):
def test_to_dash_case():
s = 'ACamelCaseName'
assert to_dash_case(s) == 'a-camel-case-name'
"""Test for function 'to_dash_case()'."""
astring = 'ACamelCaseName'
assert to_dash_case(astring) == 'a-camel-case-name'
assert to_dash_case('') == ''
def test_force_encoded_string_output():
def test_force_encoded_string_output(): # pylint: disable=invalid-name
"""Test for function 'force_encoded_string_output()'."""
def a_str_function():
"""Return a hardcoded string 'toto'."""
return str('toto')
ret = force_encoded_string_output(a_str_function)()
assert isinstance(ret, str)
@ -100,6 +108,7 @@ def test_force_encoded_string_output():
assert isinstance(ret, str)
def an_unicode_function():
"""Return a hardcoded string 'toto' in unicode."""
return u'toto'
ret = force_encoded_string_output(an_unicode_function)()
assert isinstance(ret, str)
@ -108,52 +117,65 @@ def test_force_encoded_string_output():
def test_strip_tags():
s = 'aaa b cc '
assert strip_tags(s) == s
"""Test for function 'strip_tags()'."""
ss = s + '<em>dd'
assert strip_tags(ss) == s + 'dd'
base_string = 'aaa b cc '
assert strip_tags(base_string) == base_string
ss = s + '<em>dd</em>'
assert strip_tags(ss) == s + 'dd'
astring = base_string + '<em>dd'
assert strip_tags(astring) == base_string + 'dd'
ss = s + '<em>dd</em>'
assert strip_tags(ss) == s + 'dd'
astring = base_string + '<em>dd</em>'
assert strip_tags(astring) == base_string + 'dd'
ss = s + ' 1 < 3'
assert strip_tags(ss) == s + ' 1 < 3'
astring = base_string + '<em>dd</em>'
assert strip_tags(astring) == base_string + 'dd'
astring = base_string + ' 1 < 3'
assert strip_tags(astring) == base_string + ' 1 < 3'
def test_clean_spaces():
s = 'aaa b cc '
assert clean_spaces(s) == 'aaa b cc'
"""Test for function 'clean_spaces()'."""
s = 'a\ta b\nb c\rc d\\n\\r\\td'
assert clean_spaces(s) == 'a a b b c c d d'
astring = 'aaa b cc '
assert clean_spaces(astring) == 'aaa b cc'
astring = 'a\ta b\nb c\rc d\\n\\r\\td'
assert clean_spaces(astring) == 'a a b b c c d d'
def test_normalize():
"""Test for function 'normalize()'."""
assert normalize(None) == ''
s = 'aaa b cc '
assert normalize(s) == 'aaa b cc'
astring = 'aaa b cc '
assert normalize(astring) == 'aaa b cc'
s = 'a\ta b\nb c\rc d\\n\\r\\td'
assert normalize(s) == 'a a b b c c d d'
astring = 'a\ta b\nb c\rc d\\n\\r\\td'
assert normalize(astring) == 'a a b b c c d d'
def test_get_file_data():
"""Test for function 'get_file_data()'."""
assert get_file_data(TEST_FILE_CERFA_DIA) == base64.b64encode(open(TEST_FILE_CERFA_DIA).read())
assert get_file_data(TEST_FILE_CERFA_DIA, b64=False) == open(TEST_FILE_CERFA_DIA).read()
def test_get_file_digest():
with open(TEST_FILE_CERFA_DIA) as fd:
assert get_file_digest(fd) == 'cc90a620982760fdee16a5b4fe1b5ac3b4fe868fd02d2f70b27f1e46d283ea51'
"""Test for function 'get_file_digest()'."""
with open(TEST_FILE_CERFA_DIA) as file_pt:
assert get_file_digest(file_pt) == ('cc90a620982760fdee16a5b4fe1b5ac3'
'b4fe868fd02d2f70b27f1e46d283ea51')
def test_get_upload_path():
ff = ForwardFile(
"""Test for function 'get_upload_path()'."""
forwardfile = ForwardFile(
numero_demande='45641531',
numero_dossier=FAKE_NUMERO_DOSSIER,
type_fichier='CERFA',
@ -167,10 +189,12 @@ def test_get_upload_path():
)
regex = r"^to_openADS__%s__%s\.pdf$" % (
'[0-9]{4}-[0-9]{2}-[0-9]{2}_[0-9]{2}h[0-9]{2}m[0-9]{2}s[0-9]+', 'ffdf')
assert re.search(regex, get_upload_path(ff))
assert re.search(regex, get_upload_path(forwardfile))
def test_get_file_extension():
"""Test for function 'get_file_extension()'."""
assert get_file_extension('afile.pdf') == '.pdf'
assert get_file_extension('afile', 'application/pdf') == '.pdf'
assert get_file_extension('') == ''
@ -178,65 +202,78 @@ def test_get_file_extension():
def test_trunc_str_values():
d = {}
assert trunc_str_values(d, 10) == d
d = {'a': '123456789'}
assert trunc_str_values(d, 0) == {'a': u''}
d = {'a': '123456789'}
assert trunc_str_values(d, 1) == {'a': u'1…'}
d = {'a': '123456789'}
assert trunc_str_values(d, 2) == {'a': u'12…'}
d = {'a': '123456789'}
assert trunc_str_values(d, 5) == {'a': u'12345…'}
d = {'a': '123456789'}
assert trunc_str_values(d, 8) == {'a': u'12345678…'}
d = {'a': '123456789'}
assert trunc_str_values(d, 9) == {'a': u'123456789'}
d = {'a': '123456789'}
assert trunc_str_values(d, 10) == d
"""Test for function 'trunc_str_values()'."""
d = {'a': '123456789', 'b123456789': '987654321'}
assert trunc_str_values(d, 5) == {'a': u'12345…', 'b123456789': u'98765…'}
dic = {}
assert trunc_str_values(dic, 10) == dic
dic = {'a': '123456789'}
assert trunc_str_values(dic, 0) == {'a': u''}
dic = {'a': '123456789'}
assert trunc_str_values(dic, 1) == {'a': u'1…'}
dic = {'a': '123456789'}
assert trunc_str_values(dic, 2) == {'a': u'12…'}
dic = {'a': '123456789'}
assert trunc_str_values(dic, 5) == {'a': u'12345…'}
dic = {'a': '123456789'}
assert trunc_str_values(dic, 8) == {'a': u'12345678…'}
dic = {'a': '123456789'}
assert trunc_str_values(dic, 9) == {'a': u'123456789'}
dic = {'a': '123456789'}
assert trunc_str_values(dic, 10) == dic
d = {'a': '123456789', 'b123456789': '987654321', 'c': {'c1':'ABCDEFGHIJK'}}
assert trunc_str_values(d, 5) == {'a': u'12345…', 'b123456789': u'98765…', 'c': {'c1': u'ABCDE…'}}
dic = {'a': '123456789', 'b123456789': '987654321'}
assert trunc_str_values(dic, 5) == {'a': u'12345…', 'b123456789': u'98765…'}
d = {'a': '123456789', 'b123456789': '987654321', 'c': {'c1':'ABCDEFGHIJK'}, 'd': ['123456789']}
assert trunc_str_values(d, 5) == {'a': u'12345…', 'b123456789': u'98765…', 'c': {'c1': u'ABCDE…'}, 'd': [u'12345…']}
dic = {'a': '123456789', 'b123456789': '987654321', 'c': {'c1':'ABCDEFGHIJK'}}
assert trunc_str_values(dic, 5) == {'a': u'12345…', 'b123456789': u'98765…',
'c': {'c1': u'ABCDE…'}}
d = {'a': '123456789', 'b123456789': '987654321', 'c': {'c1':'ABCDEFGHIJK'}, 'd': ['123456789', {'eeeeeeeeee':'132456789'}]}
assert trunc_str_values(d, 5) == {'a': u'12345…', 'b123456789': u'98765…', 'c': {'c1': u'ABCDE…'}, 'd': [u'12345…', {'eeeeeeeeee': u'13245…'}]}
dic = {'a': '123456789', 'b123456789': '987654321', 'c': {'c1':'ABCDEFGHIJK'},
'd': ['123456789']}
assert trunc_str_values(dic, 5) == {'a': u'12345…', 'b123456789': u'98765…',
'c': {'c1': u'ABCDE…'}, 'd': [u'12345…']}
dic = {'a': '123456789', 'b123456789': '987654321', 'c': {'c1':'ABCDEFGHIJK'},
'd': ['123456789', {'eeeeeeeeee':'132456789'}]}
assert trunc_str_values(dic, 5) == {'a': u'12345…', 'b123456789': u'98765…',
'c': {'c1': u'ABCDE…'},
'd': [u'12345…', {'eeeeeeeeee': u'13245…'}]}
def test_dict_dumper():
d = {}
"""Test for methods of class 'DictDumper'."""
dd = DictDumper(d, use_json_dumps=False)
assert repr(dd) == (u'DictDumper(dic=%r,max_str_len=%r,use_json_dumps=%r)' % (
d, dd.max_str_len, dd.use_json_dumps)).encode('utf-8')
assert str(dd) == '{}'
assert unicode(dd) == u'{}'
dic = {}
assert d == dd.dic
assert unicode(d) == unicode(dd)
dd = DictDumper(d, 0, use_json_dumps=False)
assert d == dd.dic
assert unicode(d) == unicode(dd)
dumped = DictDumper(dic, use_json_dumps=False)
assert repr(dumped) == (u'DictDumper(dic=%r,max_str_len=%r,use_json_dumps=%r)' % (
dic, dumped.max_str_len, dumped.use_json_dumps)).encode('utf-8')
assert str(dumped) == '{}'
assert unicode(dumped) == u'{}'
d = {'a': '123456789'}
dd = DictDumper(d, 10, use_json_dumps=False)
assert d == dd.dic
assert unicode(d) == unicode(dd)
dd = DictDumper(d, 5, use_json_dumps=False)
assert d == dd.dic
assert unicode(dd) == unicode({'a': u'12345…'})
dd = DictDumper(d, 5, use_json_dumps=True)
assert d == dd.dic
assert unicode(dd) == u'{"a": "12345\\u2026"}'
assert dic == dumped.dic
assert unicode(dic) == unicode(dumped)
dumped = DictDumper(dic, 0, use_json_dumps=False)
assert dic == dumped.dic
assert unicode(dic) == unicode(dumped)
dic = {'a': '123456789'}
dumped = DictDumper(dic, 10, use_json_dumps=False)
assert dic == dumped.dic
assert unicode(dic) == unicode(dumped)
dumped = DictDumper(dic, 5, use_json_dumps=False)
assert dic == dumped.dic
assert unicode(dumped) == unicode({'a': u'12345…'})
dumped = DictDumper(dic, 5, use_json_dumps=True)
assert dic == dumped.dic
assert unicode(dumped) == u'{"a": "12345\\u2026"}'
# pylint: disable=unused-argument,redefined-outer-name
def test_base_model(atreal_openads, collectivite_1, collectivite_1_guichet):
ff = ForwardFile(
"""Test for methods of class 'BaseModel' through instance of a ForwardFile."""
forwardfile = ForwardFile(
numero_demande='45641531',
numero_dossier=FAKE_NUMERO_DOSSIER,
type_fichier='CERFA',
@ -249,24 +286,24 @@ def test_base_model(atreal_openads, collectivite_1, collectivite_1_guichet):
collectivite=None
)
assert ff.get_verbose_name() == 'Forward File'
assert ff.get_verbose_name_plural() == 'Forward Files'
assert forwardfile.get_verbose_name() == 'Forward File'
assert forwardfile.get_verbose_name_plural() == 'Forward Files'
assert ff.get_class_name() == 'ForwardFile'
assert ff.get_class_name_plural() == 'ForwardFiles'
assert forwardfile.get_class_name() == 'ForwardFile'
assert forwardfile.get_class_name_plural() == 'ForwardFiles'
assert ff.get_class_name_dash_case() == 'forward-file'
assert ff.get_class_name_plural_dash_case() == 'forward-files'
assert forwardfile.get_class_name_dash_case() == 'forward-file'
assert forwardfile.get_class_name_plural_dash_case() == 'forward-files'
assert ff.get_class_name_title() == 'Forward File'
assert ff.get_class_name_plural_title() == 'Forward Files'
assert forwardfile.get_class_name_title() == 'Forward File'
assert forwardfile.get_class_name_plural_title() == 'Forward Files'
assert ff.get_url_name('list', plural=True) == 'list-forward-files'
assert forwardfile.get_url_name('list', plural=True) == 'list-forward-files'
assert ff.get_absolute_url() == '/manage/atreal-openads/atreal/forward-file/None'
assert ff.get_edit_url() == '/manage/atreal-openads/atreal/edit-forward-file/None'
assert ff.get_delete_url() == '/manage/atreal-openads/atreal/delete-forward-file/None'
assert ff.get_list_url() == '/manage/atreal-openads/atreal/forward-files'
assert forwardfile.get_absolute_url() == '/manage/atreal-openads/atreal/forward-file/None'
assert forwardfile.get_edit_url() == '/manage/atreal-openads/atreal/edit-forward-file/None'
assert forwardfile.get_delete_url() == '/manage/atreal-openads/atreal/delete-forward-file/None'
assert forwardfile.get_list_url() == '/manage/atreal-openads/atreal/forward-files'
assert atreal_openads.get_class_name_plural() == 'AtrealOpenads'
@ -276,13 +313,12 @@ def test_base_model(atreal_openads, collectivite_1, collectivite_1_guichet):
assert params['connector'] == 'atreal-openads'
assert params['slug'] == atreal_openads.slug
with pytest.raises(Exception) as e:
with pytest.raises(Exception) as exception:
atreal_openads.get_list_url()
assert unicode(e.value) == u"AtrealOpenads:get_list_url() method should not be called"
assert unicode(exception.value) == u"AtrealOpenads:get_list_url() method should not be called"
# TODO add more collectivite test cases
with pytest.raises(Exception) as e:
with pytest.raises(Exception) as exception:
collectivite_1_guichet.get_list_url()
assert unicode(e.value) == u"Guichet:get_list_url() method should not be called"
assert unicode(exception.value) == u"Guichet:get_list_url() method should not be called"

View File

@ -1,10 +1,13 @@
# -*- coding: utf-8 -*-
import pytest
"""Testing views."""
import os
import base64
import datetime
import pytest
from django.http.request import HttpRequest, QueryDict
from django.urls.base import resolve
from django.core.files import File
@ -53,7 +56,9 @@ TEST_FILE_PLAN_CADASTRAL = os.path.join(RESOURCES_DIR, 'plancadastral.pdf')
@pytest.fixture
# pylint: disable=unused-argument,invalid-name
def atreal_openads(db):
"""Return an instance of a connector AtrealOpenads."""
return AtrealOpenads.objects.create(
slug=CONNECTOR_SLUG,
default_collectivite_openADS_id=COLLECTIVITE,
@ -63,16 +68,20 @@ def atreal_openads(db):
)
@pytest.fixture
# pylint: disable=unused-argument,redefined-outer-name,invalid-name
def collectivite_1(db, atreal_openads):
return Collectivite.objects.create(
"""Return an instance of a 'Collectivite'."""
return Collectivite.objects.create( # pylint: disable=no-member
name=u'Macollectivité',
connecteur=atreal_openads,
openADS_id='3'
)
@pytest.fixture
# pylint: disable=unused-argument,redefined-outer-name,invalid-name
def collectivite_1_guichet(db, atreal_openads, collectivite_1):
return Guichet.objects.create(
"""Return an instance of a 'Guichet'."""
return Guichet.objects.create( # pylint: disable=no-member
collectivite=collectivite_1,
ouverture_jour_h=datetime.time(9, 0),
fermeture_jour_h=datetime.time(17, 0),
@ -83,8 +92,10 @@ def collectivite_1_guichet(db, atreal_openads, collectivite_1):
)
@pytest.fixture
# pylint: disable=unused-argument,redefined-outer-name,invalid-name
def forwardfile_1(db, atreal_openads, collectivite_1):
return ForwardFile.objects.create(
"""Return an instance of a 'ForwardFile'."""
return ForwardFile.objects.create( # pylint: disable=no-member
connecteur=atreal_openads,
collectivite=collectivite_1,
numero_demande='45641531',
@ -98,7 +109,10 @@ def forwardfile_1(db, atreal_openads, collectivite_1):
)
def test_get_connecteur_from_request(atreal_openads, forwardfile_1):
# pylint: disable=unused-argument,redefined-outer-name
def test_get_connecteur_from_request(atreal_openads, forwardfile_1): # pylint: disable=invalid-name
"""Test for function 'get_connecteur_from_request()'."""
req = HttpRequest()
req.path = '/manage/atreal-openads/%s/forward-file/%s' % (
atreal_openads.slug, forwardfile_1.id)
@ -109,7 +123,7 @@ def test_get_connecteur_from_request(atreal_openads, forwardfile_1):
req.content_params = None
req.COOKIES = {}
req.META = {}
req._read_started = False
req._read_started = False # pylint: disable=protected-access
req.resolver_match = resolve(req.path)
view = ForwardFileView()
@ -120,7 +134,10 @@ def test_get_connecteur_from_request(atreal_openads, forwardfile_1):
assert connecteur.slug == atreal_openads.slug
def test_get_collectivite_from_request(atreal_openads, collectivite_1):
# pylint: disable=unused-argument,redefined-outer-name
def test_get_collectivite_from_request(atreal_openads, collectivite_1): # pylint: disable=invalid-name
"""Test for function 'get_collectivite_from_request()'."""
req = HttpRequest()
req.path = '/manage/atreal-openads/%s/collectivite/%s/forward-files' % (
atreal_openads.slug, collectivite_1.id)
@ -131,7 +148,7 @@ def test_get_collectivite_from_request(atreal_openads, collectivite_1):
req.content_params = None
req.COOKIES = {}
req.META = {}
req._read_started = False
req._read_started = False # pylint: disable=protected-access
req.resolver_match = resolve(req.path)
view = ForwardFileListView()
@ -142,7 +159,10 @@ def test_get_collectivite_from_request(atreal_openads, collectivite_1):
assert collectivite.id == collectivite_1.id
# pylint: disable=too-many-statements
def test_forwardfile_view(atreal_openads, collectivite_1, forwardfile_1):
"""Test for views 'ForwardFile*View'."""
req = HttpRequest()
req.path = '/manage/atreal-openads/%s/forward-file/%s' % (
atreal_openads.slug, forwardfile_1.id)
@ -153,7 +173,7 @@ def test_forwardfile_view(atreal_openads, collectivite_1, forwardfile_1):
req.content_params = None
req.COOKIES = {}
req.META = {}
req._read_started = False
req._read_started = False # pylint: disable=protected-access
req.resolver_match = resolve(req.path)
view = ForwardFileView()
@ -186,7 +206,7 @@ def test_forwardfile_view(atreal_openads, collectivite_1, forwardfile_1):
view.kwargs = req.resolver_match.kwargs
context = view.get_context_data()
assert context['connecteur'].slug == atreal_openads.slug
del(req.GET['back-to'])
del req.GET['back-to']
url = view.get_success_url()
assert url == u'/atreal-openads/%s/' % atreal_openads.slug
req.GET['back-to'] = 'list-forward-files'
@ -208,23 +228,23 @@ def test_forwardfile_view(atreal_openads, collectivite_1, forwardfile_1):
assert context['connecteur'].slug == atreal_openads.slug
assert context['collectivite'].id == collectivite_1.id
qs = view.get_queryset()
assert qs.query is not None
assert qs.query.order_by == ['id']
assert qs.query.default_ordering == True
assert qs.query.get_meta().ordering == ['-last_update_datetime']
assert qs.ordered
queryset = view.get_queryset()
assert queryset.query is not None
assert queryset.query.order_by == ['id']
assert queryset.query.default_ordering
assert queryset.query.get_meta().ordering == ['-last_update_datetime']
assert queryset.ordered
req.GET['order-by'] = '-id'
qs = view.get_queryset()
assert qs.query is not None
assert qs.query.order_by == ['-id']
assert qs.query.default_ordering == True
queryset = view.get_queryset()
assert queryset.query is not None
assert queryset.query.order_by == ['-id']
assert queryset.query.default_ordering
req.path = '/manage/atreal-openads/%s/forward-files' % atreal_openads.slug
req.resolver_match = resolve(req.path)
del(req.GET['back-to'])
del(req.GET['order-by'])
del req.GET['back-to']
del req.GET['order-by']
view = ForwardFileListView()
view.request = req
view.object_list = []
@ -232,15 +252,18 @@ def test_forwardfile_view(atreal_openads, collectivite_1, forwardfile_1):
context = view.get_context_data()
assert context['connecteur'].slug == atreal_openads.slug
qs = view.get_queryset()
assert qs.query is not None
assert qs.query.order_by == ['id']
assert qs.query.default_ordering == True
assert qs.query.get_meta().ordering == ['-last_update_datetime']
assert qs.ordered
queryset = view.get_queryset()
assert queryset.query is not None
assert queryset.query.order_by == ['id']
assert queryset.query.default_ordering
assert queryset.query.get_meta().ordering == ['-last_update_datetime']
assert queryset.ordered
# pylint: disable=too-many-statements
def test_collectivite_view(atreal_openads, collectivite_1, forwardfile_1):
"""Test for views 'Collectivite*View'."""
req = HttpRequest()
req.path = '/manage/atreal-openads/%s/collectivite/%s' % (
atreal_openads.slug, collectivite_1.id)
@ -251,7 +274,7 @@ def test_collectivite_view(atreal_openads, collectivite_1, forwardfile_1):
req.content_params = None
req.COOKIES = {}
req.META = {}
req._read_started = False
req._read_started = False # pylint: disable=protected-access
req.resolver_match = resolve(req.path)
view = CollectiviteView()
@ -260,10 +283,12 @@ def test_collectivite_view(atreal_openads, collectivite_1, forwardfile_1):
view.kwargs = req.resolver_match.kwargs
context = view.get_context_data()
assert context['connecteur'].slug == atreal_openads.slug
assert context['guichet_add_url'] == u'/manage/atreal-openads/%s/collectivite/%s/create-guichet' % (
atreal_openads.slug, collectivite_1.id)
assert context['forward_files_list_url'] == u'/manage/atreal-openads/%s/collectivite/%s/forward-files' % (
atreal_openads.slug, collectivite_1.id)
assert context['guichet_add_url'] == (
u'/manage/atreal-openads/%s/collectivite/%s/create-guichet' % (
atreal_openads.slug, collectivite_1.id))
assert context['forward_files_list_url'] == (
u'/manage/atreal-openads/%s/collectivite/%s/forward-files' % (
atreal_openads.slug, collectivite_1.id))
view = CollectiviteUpdateView()
view.request = req
@ -284,7 +309,7 @@ def test_collectivite_view(atreal_openads, collectivite_1, forwardfile_1):
view.kwargs = req.resolver_match.kwargs
context = view.get_context_data()
assert context['connecteur'].slug == atreal_openads.slug
del(req.GET['back-to'])
del req.GET['back-to']
url = view.get_success_url()
assert url == u'/atreal-openads/%s/' % atreal_openads.slug
req.GET['back-to'] = 'list-collectivites'
@ -301,7 +326,7 @@ def test_collectivite_view(atreal_openads, collectivite_1, forwardfile_1):
assert context['connecteur'].slug == atreal_openads.slug
kwargs = view.get_form_kwargs()
assert kwargs['connecteur'].slug == atreal_openads.slug
del(req.GET['back-to'])
del req.GET['back-to']
url = view.get_success_url()
assert url == u'/atreal-openads/%s/' % atreal_openads.slug
req.GET['back-to'] = 'list-collectivites'
@ -316,23 +341,26 @@ def test_collectivite_view(atreal_openads, collectivite_1, forwardfile_1):
view.kwargs = req.resolver_match.kwargs
context = view.get_context_data()
assert context['connecteur'].slug == atreal_openads.slug
assert context['collectivite_add_url'] == u'/manage/atreal-openads/%s/create-collectivite' % atreal_openads.slug
assert context['collectivite_add_url'] == (
u'/manage/atreal-openads/%s/create-collectivite' % atreal_openads.slug)
qs = view.get_queryset()
assert qs.query is not None
assert qs.query.order_by == ['id']
assert qs.query.default_ordering == True
assert qs.query.get_meta().ordering == ['name']
assert qs.ordered
queryset = view.get_queryset()
assert queryset.query is not None
assert queryset.query.order_by == ['id']
assert queryset.query.default_ordering
assert queryset.query.get_meta().ordering == ['name']
assert queryset.ordered
req.GET['order-by'] = '-id'
qs = view.get_queryset()
assert qs.query is not None
assert qs.query.order_by == ['-id']
assert qs.query.default_ordering == True
queryset = view.get_queryset()
assert queryset.query is not None
assert queryset.query.order_by == ['-id']
assert queryset.query.default_ordering
def test_guichet_view(atreal_openads, collectivite_1, collectivite_1_guichet):
"""Test for views 'Guichet*View'."""
req = HttpRequest()
req.path = '/manage/atreal-openads/%s/collectivite/%s/guichet/%s' % (
atreal_openads.slug, collectivite_1.id, collectivite_1_guichet.id)
@ -343,7 +371,7 @@ def test_guichet_view(atreal_openads, collectivite_1, collectivite_1_guichet):
req.content_params = None
req.COOKIES = {}
req.META = {}
req._read_started = False
req._read_started = False # pylint: disable=protected-access
req.resolver_match = resolve(req.path)
view = GuichetView()
@ -391,6 +419,8 @@ def test_guichet_view(atreal_openads, collectivite_1, collectivite_1_guichet):
def test_connecteur_view(atreal_openads):
"""Test for views 'AtrealOpenadsView'."""
req = HttpRequest()
req.path = '/atreal-openads/%s/' % atreal_openads.slug
req.method = 'GET'
@ -400,7 +430,7 @@ def test_connecteur_view(atreal_openads):
req.content_params = None
req.COOKIES = {}
req.META = {}
req._read_started = False
req._read_started = False # pylint: disable=protected-access
req.resolver_match = resolve(req.path)
view = AtrealOpenadsView()
@ -409,5 +439,5 @@ def test_connecteur_view(atreal_openads):
view.kwargs = req.resolver_match.kwargs
context = view.get_context_data()
assert context['collectivite_fields'] == Collectivite.get_fields()
assert context['collectivite_add_url'] == u'/manage/atreal-openads/%s/create-collectivite' % atreal_openads.slug
assert context['collectivite_add_url'] == (
u'/manage/atreal-openads/%s/create-collectivite' % atreal_openads.slug)