toulouse_axel: move some functions in utils (#39126)

This commit is contained in:
Lauréline Guérin 2020-01-24 15:30:56 +01:00 committed by Thomas NOEL
parent b5e2d69a3f
commit bb4161fd22
4 changed files with 167 additions and 129 deletions

View File

@ -1,5 +1,5 @@
# passerelle - uniform access to multiple data sources and services
# Copyright (C) 2019 Entr'ouvert
# Copyright (C) 2020 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
@ -36,35 +36,17 @@ from passerelle.compat import json_loads
from passerelle.utils.api import endpoint
from passerelle.utils.jsonresponse import APIError
from passerelle.utils.xml import JSONSchemaFromXMLSchema
from . import utils
logger = logging.getLogger('passerelle.contrib.toulouse_axel')
BASE_XSD_PATH = os.path.join(os.path.dirname(__file__), 'xsd')
boolean_type = {
'oneOf': [
{'type': 'boolean'},
{
'type': 'string',
'pattern': '[Oo][Uu][Ii]|[Nn][Oo][Nn]|[Tt][Rr][Uu][Ee]|[Ff][Aa][Ll][Ss][Ee]|1|0',
}
]
}
datetime_type = {
'type': 'string',
'pattern': '[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}',
}
json_date_format = '%Y-%m-%d'
json_datetime_format = '%Y-%m-%dT%H:%M:%S'
xml_date_format = '%d/%m/%Y'
xml_datetime_format = '%d/%m/%Y %H:%M:%S'
PAYMENT_SCHEMA = {
'type': 'object',
'properties': {
'transaction_date': copy.deepcopy(datetime_type),
'transaction_date': copy.deepcopy(utils.datetime_type),
'transaction_id': {
'type': 'string',
}
@ -73,58 +55,6 @@ PAYMENT_SCHEMA = {
}
def indent(tree, space=" ", level=0):
# backport from Lib/xml/etree/ElementTree.py python 3.9
if isinstance(tree, ET.ElementTree):
tree = tree.getroot()
if level < 0:
raise ValueError("Initial indentation level must be >= 0, got {level}".format(level))
if not len(tree):
return
# Reduce the memory consumption by reusing indentation strings.
indentations = ["\n" + level * space]
def _indent_children(elem, level):
# Start a new indentation level for the first child.
child_level = level + 1
try:
child_indentation = indentations[child_level]
except IndexError:
child_indentation = indentations[level] + space
indentations.append(child_indentation)
if not elem.text or not elem.text.strip():
elem.text = child_indentation
for child in elem:
if len(child):
_indent_children(child, child_level)
if not child.tail or not child.tail.strip():
child.tail = child_indentation
# Dedent after the last child by overwriting the previous indentation.
if not child.tail.strip():
child.tail = indentations[level]
_indent_children(tree, 0)
def encode_bool(obj):
if obj is True or str(obj).lower() in ['true', 'oui', '1']:
return 'OUI'
if obj is False or str(obj).lower() in ['false', 'non', '0']:
return 'NON'
return obj
def encode_datetime(obj):
try:
return datetime.datetime.strptime(obj, json_datetime_format).strftime(xml_datetime_format)
except ValueError:
return obj
class AxelSchema(JSONSchemaFromXMLSchema):
type_map = {
'{urn:AllAxelTypes}DATEREQUIREDType': 'date',
@ -142,7 +72,7 @@ class AxelSchema(JSONSchemaFromXMLSchema):
def encode_date(self, obj):
try:
return datetime.datetime.strptime(obj, json_date_format).strftime(xml_date_format)
return datetime.datetime.strptime(obj, utils.json_date_format).strftime(utils.xml_date_format)
except ValueError:
return obj
@ -152,7 +82,7 @@ class AxelSchema(JSONSchemaFromXMLSchema):
return self.encode_date(obj)
def decode_date(self, data):
value = datetime.datetime.strptime(data.text, xml_date_format).strftime(json_date_format)
value = datetime.datetime.strptime(data.text, utils.xml_date_format).strftime(utils.json_date_format)
return xmlschema.ElementData(tag=data.tag, text=value, content=data.content, attributes=data.attributes)
def decode_date_optional(self, data):
@ -162,10 +92,10 @@ class AxelSchema(JSONSchemaFromXMLSchema):
@classmethod
def schema_bool(cls):
return copy.deepcopy(boolean_type)
return copy.deepcopy(utils.boolean_type)
def encode_bool(self, obj):
return encode_bool(obj)
return utils.encode_bool(obj)
def decode_bool(self, data):
value = False
@ -236,7 +166,7 @@ class Operation(object):
serialized_request = self.request_converter.encode(request_data)
except xmlschema.XMLSchemaValidationError as e:
raise AxelError('invalid request %s' % str(e))
indent(serialized_request)
utils.indent(serialized_request)
serialized_request = force_text(ET.tostring(serialized_request))
try:
self.request_converter.xml_schema.validate(serialized_request)
@ -251,7 +181,7 @@ class Operation(object):
'') # FIXME: What is the user parameter for ?
xml_result = ET.fromstring(result.encode('utf-8'))
indent(xml_result)
utils.indent(xml_result)
pretty_result = force_text(ET.tostring(xml_result))
if xml_result.find('RESULTAT/STATUS').text != 'OK':
msg = xml_result.find('RESULTAT/COMMENTAIRES').text
@ -335,7 +265,7 @@ class ToulouseAxel(BaseResource):
form_maj_famille_dui.request_schema['properties']['PORTAIL']['properties']['DUI'])
for flag in sorted(UPDATE_FAMILY_FLAGS.keys()):
flag_type = copy.deepcopy(boolean_type)
flag_type = copy.deepcopy(utils.boolean_type)
if flag not in UPDATE_FAMILY_REQUIRED_FLAGS:
flag_type['oneOf'].append({'type': 'null'})
flag_type['oneOf'].append({'type': 'string', 'enum': ['']})
@ -384,7 +314,7 @@ class ToulouseAxel(BaseResource):
'required': ['ASTHME', 'MEDICAMENTEUSES', 'ALIMENTAIRES', 'AUTRES'],
}
for key in ['ASTHME', 'MEDICAMENTEUSES', 'ALIMENTAIRES']:
sanitaire_properties['ALLERGIE']['properties'][key] = copy.deepcopy(boolean_type)
sanitaire_properties['ALLERGIE']['properties'][key] = copy.deepcopy(utils.boolean_type)
sanitaire_properties['ALLERGIE']['properties']['AUTRES'] = {
'oneOf': [
{'type': 'null'},
@ -576,7 +506,7 @@ class ToulouseAxel(BaseResource):
flags = sorted(self.UPDATE_FAMILY_FLAGS.keys())
for flag in flags:
flag_value = post_data.get(flag)
flag_value = encode_bool(flag_value)
flag_value = utils.encode_bool(flag_value)
# no update for the related block
if flag_value == 'OUI':
@ -754,40 +684,6 @@ class ToulouseAxel(BaseResource):
}
}
def normalize_invoice(self, invoice, dui, historical=False, vendor_base=None):
vendor = vendor_base or {}
vendor.update(invoice)
invoice_id = '%s-%s' % (dui, invoice['IDFACTURE'])
if historical:
invoice_id = 'historical-%s' % invoice_id
data = {
'id': invoice_id,
'display_id': str(invoice['IDFACTURE']),
'label': invoice['LIBELLE'],
'paid': False,
'vendor': {'toulouse-axel': vendor},
}
if historical:
data.update({
'amount': 0,
'total_amount': invoice['MONTANT'],
'created': invoice['EMISSION'],
'pay_limit_date': '',
'online_payment': False,
'has_pdf': invoice['IPDF'] == '1',
})
else:
data.update({
'amount': invoice['RESTEAPAYER'],
'total_amount': invoice['MONTANTTOTAL'],
'created': invoice['DATEEMISSION'],
'pay_limit_date': invoice['DATEECHEANCE'],
'has_pdf': invoice['EXISTEPDF'] == '1',
})
pay_limit_date = datetime.datetime.strptime(invoice['DATEECHEANCE'], '%Y-%m-%d').date()
data['online_payment'] = data['amount'] > 0 and pay_limit_date >= datetime.date.today()
return data
def get_invoices(self, regie_id, dui=None, name_id=None):
assert name_id or dui
if name_id:
@ -807,7 +703,7 @@ class ToulouseAxel(BaseResource):
for facture in data.get('FACTURES', []):
if facture['IDREGIE'] != regie_id:
continue
result.append(self.normalize_invoice(facture, dui))
result.append(utils.normalize_invoice(facture, dui))
return result
def get_historical_invoices(self, name_id):
@ -828,7 +724,7 @@ class ToulouseAxel(BaseResource):
for direction in data.get('DIRECTION', []):
for facture in direction.get('FACTURE', []):
result.append(
self.normalize_invoice(
utils.normalize_invoice(
facture,
link.dui,
historical=True,
@ -968,7 +864,7 @@ class ToulouseAxel(BaseResource):
transaction_amount = invoice['amount']
transaction_id = data['transaction_id']
transaction_date = encode_datetime(data['transaction_date'])
transaction_date = utils.encode_datetime(data['transaction_date'])
post_data = {
'IDFACTURE': int(invoice_id),
'IDREGIEENCAISSEMENT': '',

View File

@ -0,0 +1,124 @@
# passerelle - uniform access to multiple data sources and services
# Copyright (C) 2020 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import datetime
import xml.etree.ElementTree as ET
boolean_type = {
'oneOf': [
{'type': 'boolean'},
{
'type': 'string',
'pattern': '[Oo][Uu][Ii]|[Nn][Oo][Nn]|[Tt][Rr][Uu][Ee]|[Ff][Aa][Ll][Ss][Ee]|1|0',
}
]
}
datetime_type = {
'type': 'string',
'pattern': '[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}',
}
json_date_format = '%Y-%m-%d'
json_datetime_format = '%Y-%m-%dT%H:%M:%S'
xml_date_format = '%d/%m/%Y'
xml_datetime_format = '%d/%m/%Y %H:%M:%S'
def indent(tree, space=" ", level=0):
# backport from Lib/xml/etree/ElementTree.py python 3.9
if isinstance(tree, ET.ElementTree):
tree = tree.getroot()
if level < 0:
raise ValueError("Initial indentation level must be >= 0, got {level}".format(level))
if not len(tree):
return
# Reduce the memory consumption by reusing indentation strings.
indentations = ["\n" + level * space]
def _indent_children(elem, level):
# Start a new indentation level for the first child.
child_level = level + 1
try:
child_indentation = indentations[child_level]
except IndexError:
child_indentation = indentations[level] + space
indentations.append(child_indentation)
if not elem.text or not elem.text.strip():
elem.text = child_indentation
for child in elem:
if len(child):
_indent_children(child, child_level)
if not child.tail or not child.tail.strip():
child.tail = child_indentation
# Dedent after the last child by overwriting the previous indentation.
if not child.tail.strip():
child.tail = indentations[level]
_indent_children(tree, 0)
def encode_bool(obj):
if obj is True or str(obj).lower() in ['true', 'oui', '1']:
return 'OUI'
if obj is False or str(obj).lower() in ['false', 'non', '0']:
return 'NON'
return obj
def encode_datetime(obj):
try:
return datetime.datetime.strptime(obj, json_datetime_format).strftime(xml_datetime_format)
except ValueError:
return obj
def normalize_invoice(invoice, dui, historical=False, vendor_base=None):
vendor = vendor_base or {}
vendor.update(invoice)
invoice_id = '%s-%s' % (dui, invoice['IDFACTURE'])
if historical:
invoice_id = 'historical-%s' % invoice_id
data = {
'id': invoice_id,
'display_id': str(invoice['IDFACTURE']),
'label': invoice['LIBELLE'],
'paid': False,
'vendor': {'toulouse-axel': vendor},
}
if historical:
data.update({
'amount': 0,
'total_amount': invoice['MONTANT'],
'created': invoice['EMISSION'],
'pay_limit_date': '',
'online_payment': False,
'has_pdf': invoice['IPDF'] == '1',
})
else:
data.update({
'amount': invoice['RESTEAPAYER'],
'total_amount': invoice['MONTANTTOTAL'],
'created': invoice['DATEEMISSION'],
'pay_limit_date': invoice['DATEECHEANCE'],
'has_pdf': invoice['EXISTEPDF'] == '1',
})
pay_limit_date = datetime.datetime.strptime(invoice['DATEECHEANCE'], '%Y-%m-%d').date()
data['online_payment'] = data['amount'] > 0 and pay_limit_date >= datetime.date.today()
return data

View File

@ -17,7 +17,6 @@
import os
from passerelle.contrib.toulouse_axel.models import AxelSchema
from passerelle.contrib.toulouse_axel.models import encode_datetime
import pytest
import xmlschema
@ -61,15 +60,6 @@ def test_date_mapping(date_type):
assert json_data['DATE'] is None
def test_encode_datetime():
# wrong format
assert encode_datetime('foo') == 'foo'
assert encode_datetime('2019-12-12') == '2019-12-12'
assert encode_datetime('2019-12-12T12:01:72') == '2019-12-12T12:01:72'
# ok
assert encode_datetime('2019-12-12T12:01:42') == '12/12/2019 12:01:42'
@pytest.mark.parametrize('bool_type', ['OUINONREQUIREDType', 'OUINONType'])
@pytest.mark.parametrize('value, expected, py_expected', [
('OUI', 'OUI', True),

View File

@ -0,0 +1,28 @@
# passerelle - uniform access to multiple data sources and services
# Copyright (C) 2020 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from passerelle.contrib.toulouse_axel.utils import (
encode_datetime,
)
def test_encode_datetime():
# wrong format
assert encode_datetime('foo') == 'foo'
assert encode_datetime('2019-12-12') == '2019-12-12'
assert encode_datetime('2019-12-12T12:01:72') == '2019-12-12T12:01:72'
# ok
assert encode_datetime('2019-12-12T12:01:42') == '12/12/2019 12:01:42'