diff --git a/passerelle/apps/cartads_cs/models.py b/passerelle/apps/cartads_cs/models.py
index 2e04fd0a..21791a98 100644
--- a/passerelle/apps/cartads_cs/models.py
+++ b/passerelle/apps/cartads_cs/models.py
@@ -799,42 +799,43 @@ class AbstractCartaDSCS(BaseResource):
def pack(self, dossier_id):
dossier = CartaDSDossier.objects.get(id=dossier_id)
zip_filename = os.path.join(default_storage.path('cartads_cs'), '%s.zip' % dossier.tracking_code)
- zip_file = zipfile.ZipFile(zip_filename, mode='w')
- liste_pdf = self.liste_pdf(None, dossier.type_dossier_id)
- cerfa_id = liste_pdf['data'][0]['id']
- for cerfa in liste_pdf['data']:
- if cerfa['id'] == 'AUTRES_DEMANDEURS':
- continue
- cerfa_id = cerfa['id']
- break
- cerfa_id = cerfa_id.replace('*', '-')
- pieces = self.pieces(None, dossier.type_dossier_id, dossier.objet_demande_id, dossier.tracking_code)
- for piece in pieces['data']:
- cnt = 1
- for file in piece['files']:
- if not file.get('id'):
+ with zipfile.ZipFile(zip_filename, mode='w') as zip_file:
+ liste_pdf = self.liste_pdf(None, dossier.type_dossier_id)
+ cerfa_id = liste_pdf['data'][0]['id']
+ for cerfa in liste_pdf['data']:
+ if cerfa['id'] == 'AUTRES_DEMANDEURS':
continue
- cartads_file = CartaDSFile.objects.get(id=file['id'])
- if piece['id'] == 'cerfa-%s-%s' % (dossier.type_dossier_id, dossier.objet_demande_id):
- zip_file.write(cartads_file.uploaded_file.path, '%s.pdf' % cerfa_id)
- elif piece['id'].startswith('cerfa-autres-'):
- zip_file.write(
- cartads_file.uploaded_file.path,
- 'Fiches_complementaires/Cerfa_autres_demandeurs_%d.pdf' % cnt,
- )
- else:
- zip_file.write(
- cartads_file.uploaded_file.path,
- 'Pieces/%s-%s%s%s'
- % (
- piece['id'],
- piece['codePiece'],
- cnt,
- os.path.splitext(cartads_file.uploaded_file.path)[-1],
- ),
- )
- cnt += 1
- zip_file.close()
+ cerfa_id = cerfa['id']
+ break
+ cerfa_id = cerfa_id.replace('*', '-')
+ pieces = self.pieces(
+ None, dossier.type_dossier_id, dossier.objet_demande_id, dossier.tracking_code
+ )
+ for piece in pieces['data']:
+ cnt = 1
+ for file in piece['files']:
+ if not file.get('id'):
+ continue
+ cartads_file = CartaDSFile.objects.get(id=file['id'])
+ if piece['id'] == 'cerfa-%s-%s' % (dossier.type_dossier_id, dossier.objet_demande_id):
+ zip_file.write(cartads_file.uploaded_file.path, '%s.pdf' % cerfa_id)
+ elif piece['id'].startswith('cerfa-autres-'):
+ zip_file.write(
+ cartads_file.uploaded_file.path,
+ 'Fiches_complementaires/Cerfa_autres_demandeurs_%d.pdf' % cnt,
+ )
+ else:
+ zip_file.write(
+ cartads_file.uploaded_file.path,
+ 'Pieces/%s-%s%s%s'
+ % (
+ piece['id'],
+ piece['codePiece'],
+ cnt,
+ os.path.splitext(cartads_file.uploaded_file.path)[-1],
+ ),
+ )
+ cnt += 1
dossier.zip_ready = True
dossier.save()
self.add_job('send_to_cartads', dossier_id=dossier.id)
@@ -843,8 +844,9 @@ class AbstractCartaDSCS(BaseResource):
ftp = FTP(self.ftp_server)
ftp.login(self.ftp_username, self.ftp_password)
ftp.cwd(self.ftp_client_name)
- ftp.storbinary('STOR %s' % os.path.basename(zip_filename), open(zip_filename, 'rb'))
- ftp.quit()
+ with open(zip_filename, 'rb') as fd:
+ ftp.storbinary('STOR %s' % os.path.basename(zip_filename), fd)
+ ftp.quit()
def send_to_cartads(self, dossier_id):
dossier = CartaDSDossier.objects.get(id=dossier_id)
diff --git a/passerelle/apps/cryptor/models.py b/passerelle/apps/cryptor/models.py
index bd5df306..2cc1810f 100644
--- a/passerelle/apps/cryptor/models.py
+++ b/passerelle/apps/cryptor/models.py
@@ -214,9 +214,11 @@ class Cryptor(BaseResource):
if not os.path.exists(metadata_filename):
raise APIError('unknown uuid', http_status=404)
- content = read_decrypt(open(content_filename, 'rb'), self.private_key)
+ with open(content_filename, 'rb') as fd:
+ content = read_decrypt(fd, self.private_key)
- metadata = json.load(open(metadata_filename, 'r'))
+ with open(metadata_filename, 'r') as fd:
+ metadata = json.load(fd)
filename = metadata.get('filename')
content_type = metadata.get('content_type')
diff --git a/passerelle/apps/family/management/commands/import_orleans_data.py b/passerelle/apps/family/management/commands/import_orleans_data.py
index c477f9ee..b8d31632 100644
--- a/passerelle/apps/family/management/commands/import_orleans_data.py
+++ b/passerelle/apps/family/management/commands/import_orleans_data.py
@@ -50,14 +50,15 @@ class Command(BaseCommand):
storage = DefaultStorage()
lock_filename = storage.path('family-%s/import-orleans-data.lock' % connector.id)
try:
- fd = open(lock_filename, 'w')
+ fd = open(lock_filename, 'w') # pylint: disable=consider-using-with
fcntl.lockf(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
except IOError:
raise CommandError('Command already running.')
try:
archive_name = os.path.basename(options['archive_file'])
- connector.archive.save(archive_name, File(open(options['archive_file'], 'rb')))
+ with open(options['archive_file'], 'rb') as archive_fd:
+ connector.archive.save(archive_name, File(archive_fd))
except Exception as e:
raise CommandError('Error occured: %s' % e)
finally:
diff --git a/passerelle/apps/family/models.py b/passerelle/apps/family/models.py
index 17493086..88f8aa65 100644
--- a/passerelle/apps/family/models.py
+++ b/passerelle/apps/family/models.py
@@ -174,14 +174,14 @@ class GenericFamily(BaseResource):
def clean(self):
if self.archive:
try:
- archive = zipfile.ZipFile(self.archive)
+ with zipfile.ZipFile(self.archive) as archive:
+ if self.file_format != 'native':
+ modname = 'passerelle.apps.family.loaders.%s' % self.file_format
+ __import__(modname)
+ module = sys.modules[modname]
+ module.Loader(self).clean(archive)
except zipfile.BadZipfile:
raise ValidationError(_('Invalid zip file.'))
- if self.file_format != 'native':
- modname = 'passerelle.apps.family.loaders.%s' % self.file_format
- __import__(modname)
- module = sys.modules[modname]
- module.Loader(self).clean(archive)
return super().clean()
@@ -195,101 +195,101 @@ class GenericFamily(BaseResource):
if not os.path.exists(invoices_dir):
os.makedirs(invoices_dir)
- archive = zipfile.ZipFile(self.archive.path)
- if self.file_format != 'native':
- modname = 'passerelle.apps.family.loaders.%s' % self.file_format
- __import__(modname)
- module = sys.modules[modname]
- module.Loader(self).load(archive)
- return
+ with zipfile.ZipFile(self.archive.path) as archive:
+ if self.file_format != 'native':
+ modname = 'passerelle.apps.family.loaders.%s' % self.file_format
+ __import__(modname)
+ module = sys.modules[modname]
+ module.Loader(self).load(archive)
+ return
- archive_files = archive.namelist()
+ archive_files = archive.namelist()
- family_files = [d for d in archive_files if d.endswith('.json')]
- families = []
- invoices = []
- children = []
- adults = []
+ family_files = [d for d in archive_files if d.endswith('.json')]
+ families = []
+ invoices = []
+ children = []
+ adults = []
- for f in family_files:
- family_data = json_loads(archive.read(f))
- families.append(family_data['id'])
- address = family_data.get('address') or {}
- family_data.update(address)
- data = dict_cherry_pick(
- family_data,
- (
- 'login',
- 'password',
- 'family_quotient',
- ('number', 'street_number'),
- ('postal_code', 'zipcode'),
- ('street', 'street_name'),
- ('complement', 'address_complement'),
- ),
- )
- family, dummy = Family.objects.update_or_create(
- external_id=family_data['id'], resource=self, defaults=data
- )
- for adult in family_data.get('adults') or []:
- adults.append(adult['id'])
- adult_address = adult.get('address') or {}
- adult.update(adult_address)
+ for f in family_files:
+ family_data = json_loads(archive.read(f))
+ families.append(family_data['id'])
+ address = family_data.get('address') or {}
+ family_data.update(address)
data = dict_cherry_pick(
- adult,
+ family_data,
(
- 'first_name',
- 'last_name',
- 'phone',
- ('mobile', 'cellphone'),
- 'sex',
+ 'login',
+ 'password',
+ 'family_quotient',
('number', 'street_number'),
('postal_code', 'zipcode'),
('street', 'street_name'),
('complement', 'address_complement'),
- 'country',
),
)
- Adult.objects.update_or_create(family=family, external_id=adult['id'], defaults=data)
- # cleanup adults
- Adult.objects.exclude(external_id__in=adults).delete()
-
- for child in family_data.get('children') or []:
- children.append(child['id'])
- data = dict_cherry_pick(child, ('first_name', 'last_name', 'sex', 'birthdate'))
- Child.objects.get_or_create(family=family, external_id=child['id'], defaults=data)
- # cleanup children
- Child.objects.exclude(external_id__in=children).delete()
-
- for invoice in family_data['invoices']:
- invoices.append(invoice['id'])
- data = dict_cherry_pick(
- invoice,
- (
- 'label',
- ('created', 'issue_date'),
- 'pay_limit_date',
- 'litigation_date',
- 'total_amount',
- 'payment_date',
- 'amount',
- 'autobilling',
- ),
+ family, dummy = Family.objects.update_or_create(
+ external_id=family_data['id'], resource=self, defaults=data
)
- for date_attribute in data.keys():
- if not date_attribute.endswith('_date'):
- continue
- if date_attribute == 'payment_date':
- data[date_attribute] = get_datetime(data[date_attribute])
- else:
- data[date_attribute] = get_date(data[date_attribute])
- data['paid'] = bool(data.get('payment_date'))
- Invoice.objects.update_or_create(
- resource=self, family=family, external_id=invoice['id'], defaults=data
- )
- if 'invoices/%s.pdf' % invoice['id'] in archive_files:
- with open(os.path.join(invoices_dir, '%s.pdf' % invoice['id']), 'wb') as fp:
- fp.write(archive.read('invoices/%s.pdf' % invoice['id']))
+ for adult in family_data.get('adults') or []:
+ adults.append(adult['id'])
+ adult_address = adult.get('address') or {}
+ adult.update(adult_address)
+ data = dict_cherry_pick(
+ adult,
+ (
+ 'first_name',
+ 'last_name',
+ 'phone',
+ ('mobile', 'cellphone'),
+ 'sex',
+ ('number', 'street_number'),
+ ('postal_code', 'zipcode'),
+ ('street', 'street_name'),
+ ('complement', 'address_complement'),
+ 'country',
+ ),
+ )
+ Adult.objects.update_or_create(family=family, external_id=adult['id'], defaults=data)
+ # cleanup adults
+ Adult.objects.exclude(external_id__in=adults).delete()
+
+ for child in family_data.get('children') or []:
+ children.append(child['id'])
+ data = dict_cherry_pick(child, ('first_name', 'last_name', 'sex', 'birthdate'))
+ Child.objects.get_or_create(family=family, external_id=child['id'], defaults=data)
+ # cleanup children
+ Child.objects.exclude(external_id__in=children).delete()
+
+ for invoice in family_data['invoices']:
+ invoices.append(invoice['id'])
+ data = dict_cherry_pick(
+ invoice,
+ (
+ 'label',
+ ('created', 'issue_date'),
+ 'pay_limit_date',
+ 'litigation_date',
+ 'total_amount',
+ 'payment_date',
+ 'amount',
+ 'autobilling',
+ ),
+ )
+ for date_attribute in data.keys():
+ if not date_attribute.endswith('_date'):
+ continue
+ if date_attribute == 'payment_date':
+ data[date_attribute] = get_datetime(data[date_attribute])
+ else:
+ data[date_attribute] = get_date(data[date_attribute])
+ data['paid'] = bool(data.get('payment_date'))
+ Invoice.objects.update_or_create(
+ resource=self, family=family, external_id=invoice['id'], defaults=data
+ )
+ if 'invoices/%s.pdf' % invoice['id'] in archive_files:
+ with open(os.path.join(invoices_dir, '%s.pdf' % invoice['id']), 'wb') as fp:
+ fp.write(archive.read('invoices/%s.pdf' % invoice['id']))
# cleanup invoices
Invoice.objects.exclude(external_id__in=invoices).delete()
@@ -526,7 +526,8 @@ class Invoice(models.Model):
if not self.has_pdf:
raise Http404(_('PDF file not found'))
- response = HttpResponse(open(self.pdf_filename(), 'rb').read(), content_type='application/pdf')
+ with open(self.pdf_filename(), 'rb') as fd:
+ response = HttpResponse(fd.read(), content_type='application/pdf')
response['Content-Disposition'] = 'attachment; filename=%s.pdf' % self.external_id
return response
diff --git a/passerelle/apps/sp_fr/models.py b/passerelle/apps/sp_fr/models.py
index 399bee07..083f5e2a 100644
--- a/passerelle/apps/sp_fr/models.py
+++ b/passerelle/apps/sp_fr/models.py
@@ -232,76 +232,78 @@ class Resource(BaseResource):
def process(self, fd):
try:
- archive = zipfile.ZipFile(fd)
- except Exception:
+ with zipfile.ZipFile(fd) as archive:
+ # sort files
+ doc_files = []
+ ent_files = []
+ attachments = {}
+ for name in archive.namelist():
+ if ENT_PATTERN.match(name):
+ ent_files.append(name)
+
+ if len(ent_files) != 1:
+ return False, 'too many/few ent files found: %s' % ent_files
+
+ ent_file = ent_files[0]
+
+ with archive.open(ent_file) as fd:
+ document = ET.parse(fd)
+
+ for pj_node in PIECE_JOINTE_XPATH(document):
+ code = CODE_XPATH(pj_node)[0].text
+ code = 'pj_' + code.lower().replace('-', '_')
+ fichier = FICHIER_XPATH(pj_node)[0].text
+ attachments.setdefault(code, []).append(fichier)
+ for doc_node in DOCUMENTS_XPATH(document):
+ code = CODE_XPATH(doc_node)[0].text
+ code = 'doc_' + code.lower().replace('-', '_')
+ fichier = FICHIER_DONNEES_XPATH(doc_node)[0].text
+ attachments.setdefault(code, []).append(fichier)
+
+ doc_files = [
+ value for l in attachments.values() for value in l if value.lower().endswith('.xml')
+ ]
+ if len(doc_files) != 1:
+ return False, 'too many/few doc files found: %s' % doc_files
+
+ for key in attachments:
+ if len(attachments[key]) > 1:
+ return False, 'too many attachments of kind %s: %r' % (key, attachments[key])
+ name = attachments[key][0]
+ with archive.open(attachments[key][0]) as zip_fd:
+ content = zip_fd.read()
+ attachments[key] = {
+ 'filename': name,
+ 'content': base64.b64encode(content).decode('ascii'),
+ 'content_type': 'application/octet-stream',
+ }
+
+ if self.procedure == PROCEDURE_RCO and not attachments:
+ return False, 'no attachments but RCO requires them'
+
+ doc_file = doc_files[0]
+
+ insee_codes = ROUTAGE_XPATH(document)
+ if len(insee_codes) != 1:
+ return False, 'too many/few insee codes found: %s' % insee_codes
+ insee_code = insee_codes[0]
+
+ email = EMAIL_XPATH(document)
+ email = email[0] if email else ''
+
+ data = {
+ 'insee_code': insee_code,
+ 'email': email,
+ }
+ data.update(attachments)
+
+ with archive.open(doc_file) as fd:
+ document = ET.parse(fd)
+ data.update(self.extract_data(document))
+ if hasattr(self, 'update_data_%s' % self.procedure):
+ getattr(self, 'update_data_%s' % self.procedure)(data)
+ except zipfile.BadZipfile:
return False, 'could not load zipfile'
- # sort files
- doc_files = []
- ent_files = []
- attachments = {}
- for name in archive.namelist():
- if ENT_PATTERN.match(name):
- ent_files.append(name)
-
- if len(ent_files) != 1:
- return False, 'too many/few ent files found: %s' % ent_files
-
- ent_file = ent_files[0]
-
- with archive.open(ent_file) as fd:
- document = ET.parse(fd)
-
- for pj_node in PIECE_JOINTE_XPATH(document):
- code = CODE_XPATH(pj_node)[0].text
- code = 'pj_' + code.lower().replace('-', '_')
- fichier = FICHIER_XPATH(pj_node)[0].text
- attachments.setdefault(code, []).append(fichier)
- for doc_node in DOCUMENTS_XPATH(document):
- code = CODE_XPATH(doc_node)[0].text
- code = 'doc_' + code.lower().replace('-', '_')
- fichier = FICHIER_DONNEES_XPATH(doc_node)[0].text
- attachments.setdefault(code, []).append(fichier)
-
- doc_files = [value for l in attachments.values() for value in l if value.lower().endswith('.xml')]
- if len(doc_files) != 1:
- return False, 'too many/few doc files found: %s' % doc_files
-
- for key in attachments:
- if len(attachments[key]) > 1:
- return False, 'too many attachments of kind %s: %r' % (key, attachments[key])
- name = attachments[key][0]
- with archive.open(attachments[key][0]) as zip_fd:
- content = zip_fd.read()
- attachments[key] = {
- 'filename': name,
- 'content': base64.b64encode(content).decode('ascii'),
- 'content_type': 'application/octet-stream',
- }
-
- if self.procedure == PROCEDURE_RCO and not attachments:
- return False, 'no attachments but RCO requires them'
-
- doc_file = doc_files[0]
-
- insee_codes = ROUTAGE_XPATH(document)
- if len(insee_codes) != 1:
- return False, 'too many/few insee codes found: %s' % insee_codes
- insee_code = insee_codes[0]
-
- email = EMAIL_XPATH(document)
- email = email[0] if email else ''
-
- data = {
- 'insee_code': insee_code,
- 'email': email,
- }
- data.update(attachments)
-
- with archive.open(doc_file) as fd:
- document = ET.parse(fd)
- data.update(self.extract_data(document))
- if hasattr(self, 'update_data_%s' % self.procedure):
- getattr(self, 'update_data_%s' % self.procedure)(data)
return data, None
def transfer(self, data):
diff --git a/passerelle/base/management/commands/export_site.py b/passerelle/base/management/commands/export_site.py
index d5fbd2d1..fef84d29 100644
--- a/passerelle/base/management/commands/export_site.py
+++ b/passerelle/base/management/commands/export_site.py
@@ -17,7 +17,8 @@ class Command(BaseCommand):
def handle(self, *args, **options):
if options['output']:
- output = open(options['output'], 'w')
- else:
- output = sys.stdout
+ with open(options['output'], 'w') as output:
+ json.dump(export_site(slugs=options['slugs']), output, indent=4)
+ return
+ output = sys.stdout
json.dump(export_site(slugs=options['slugs']), output, indent=4)
diff --git a/passerelle/utils/zip.py b/passerelle/utils/zip.py
index 501c9b33..91256ba9 100644
--- a/passerelle/utils/zip.py
+++ b/passerelle/utils/zip.py
@@ -244,8 +244,6 @@ class ZipTemplate(object):
def diff_zip(one, two):
- differences = []
-
def compute_diff(one, two, fd_one, fd_two):
content_one = fd_one.read()
content_two = fd_two.read()
@@ -257,32 +255,40 @@ def diff_zip(one, two):
return ['File %s differs' % one] + diff
return 'File %s differs' % one
+ def run(one, two):
+ differences = []
+ with zipfile.ZipFile(one) as one_zip, zipfile.ZipFile(two) as two_zip:
+ one_nl = set(one_zip.namelist())
+ two_nl = set(two_zip.namelist())
+ for name in one_nl - two_nl:
+ differences.append('File %s only in %s' % (name, one))
+ for name in two_nl - one_nl:
+ differences.append('File %s only in %s' % (name, two))
+ for name in one_nl & two_nl:
+ with one_zip.open(name) as fd_one:
+ with two_zip.open(name) as fd_two:
+ difference = compute_diff(name, name, fd_one, fd_two)
+ if difference:
+ differences.append(difference)
+
+ if not differences:
+ # check file order in zip
+ one_zip_files = [zi.filename for zi in one_zip.infolist()]
+ two_zip_files = [zi.filename for zi in two_zip.infolist()]
+ if one_zip_files != two_zip_files:
+ differences.append('Files are not in the same order')
+ return differences
+
if not hasattr(one, 'read'):
- one = open(one, mode='rb')
+ with open(one, mode='rb') as one:
+ if not hasattr(two, 'read'):
+ with open(two, 'rb') as two:
+ return run(one, two)
+ with two:
+ return run(one, two)
with one:
if not hasattr(two, 'read'):
- two = open(two, 'rb')
+ with open(two, 'rb') as two:
+ return run(one, two)
with two:
- with zipfile.ZipFile(one) as one_zip:
- with zipfile.ZipFile(two) as two_zip:
- one_nl = set(one_zip.namelist())
- two_nl = set(two_zip.namelist())
- for name in one_nl - two_nl:
- differences.append('File %s only in %s' % (name, one))
- for name in two_nl - one_nl:
- differences.append('File %s only in %s' % (name, two))
- for name in one_nl & two_nl:
- with one_zip.open(name) as fd_one:
- with two_zip.open(name) as fd_two:
- difference = compute_diff(name, name, fd_one, fd_two)
- if difference:
- differences.append(difference)
-
- if not differences:
- # check file order in zip
- one_zip_files = [zi.filename for zi in one_zip.infolist()]
- two_zip_files = [zi.filename for zi in two_zip.infolist()]
- if one_zip_files != two_zip_files:
- differences.append('Files are not in the same order')
-
- return differences
+ return run(one, two)
diff --git a/tests/test_atos_genesys.py b/tests/test_atos_genesys.py
index 09933d3f..ff6f3885 100644
--- a/tests/test_atos_genesys.py
+++ b/tests/test_atos_genesys.py
@@ -20,9 +20,8 @@ def genesys(db):
@pytest.fixture
def mock_codifications_ok():
- response = open(
- os.path.join(os.path.dirname(__file__), 'data', 'genesys_select_codifications.xml')
- ).read()
+ with open(os.path.join(os.path.dirname(__file__), 'data', 'genesys_select_codifications.xml')) as fd:
+ response = fd.read()
with tests.utils.mock_url(FAKE_URL, response) as mock:
yield mock
@@ -143,9 +142,8 @@ def test_ws_link_created(app, genesys):
assert Link.objects.count() == 0
-RESPONSE_SELECT_USAGER = open(
- os.path.join(os.path.dirname(__file__), 'data', 'genesys_select_usager.xml')
-).read()
+with open(os.path.join(os.path.dirname(__file__), 'data', 'genesys_select_usager.xml')) as fd:
+ RESPONSE_SELECT_USAGER = fd.read()
def test_ws_dossiers(app, genesys):
diff --git a/tests/test_cartads_cs.py b/tests/test_cartads_cs.py
index 89121d14..8b441d45 100644
--- a/tests/test_cartads_cs.py
+++ b/tests/test_cartads_cs.py
@@ -290,11 +290,13 @@ def test_pieces_management(connector, app, cached_data):
resp = app.post(data[0]['files'][0]['url'], upload_files=[('files[]', 'test.pdf', b'%PDF...')])
assert resp.json == [{'error': 'The CERFA should be a PDF file.'}]
- pdf_contents = open(os.path.join(os.path.dirname(__file__), 'data', 'minimal.pdf'), 'rb').read()
+ with open(os.path.join(os.path.dirname(__file__), 'data', 'minimal.pdf'), 'rb') as fd:
+ pdf_contents = fd.read()
resp = app.post(data[0]['files'][0]['url'], upload_files=[('files[]', 'test.pdf', pdf_contents)])
assert resp.json == [{'error': 'The CERFA should not be a scanned document.'}]
- pdf_contents = open(os.path.join(os.path.dirname(__file__), 'data', 'pdf-form.pdf'), 'rb').read()
+ with open(os.path.join(os.path.dirname(__file__), 'data', 'pdf-form.pdf'), 'rb') as fd:
+ pdf_contents = fd.read()
resp = app.post(data[0]['files'][0]['url'], upload_files=[('files[]', 'test.pdf', pdf_contents)])
cerfa_token = resp.json[0]['token']
@@ -554,7 +556,8 @@ def test_doc_pieces_management(connector, app, cached_data):
assert len(data[0]['files']) == 1
assert list(data[0]['files'][0].keys()) == ['url']
- pdf_contents = open(os.path.join(os.path.dirname(__file__), 'data', 'pdf-form.pdf'), 'rb').read()
+ with open(os.path.join(os.path.dirname(__file__), 'data', 'pdf-form.pdf'), 'rb') as fd:
+ pdf_contents = fd.read()
resp = app.post(data[0]['files'][0]['url'], upload_files=[('files[]', 'test.pdf', pdf_contents)])
assert resp.json[0]['token']
assert CartaDSFile.objects.filter(tracking_code=dossier.tracking_code, sent_to_cartads=None).count() == 1
@@ -594,12 +597,14 @@ def test_daact_pieces_management(connector, app, cached_data):
assert len(piece['files']) == 1
assert list(piece['files'][0].keys()) == ['url']
- pdf_contents = open(os.path.join(os.path.dirname(__file__), 'data', 'pdf-form.pdf'), 'rb').read()
+ with open(os.path.join(os.path.dirname(__file__), 'data', 'pdf-form.pdf'), 'rb') as fd:
+ pdf_contents = fd.read()
resp = app.post(data[0]['files'][0]['url'], upload_files=[('files[]', 'test.pdf', pdf_contents)])
assert resp.json[0]['token']
assert CartaDSFile.objects.filter(tracking_code=dossier.tracking_code, sent_to_cartads=None).count() == 1
- pdf_contents = open(os.path.join(os.path.dirname(__file__), 'data', 'pdf-form.pdf'), 'rb').read()
+ with open(os.path.join(os.path.dirname(__file__), 'data', 'pdf-form.pdf'), 'rb') as fd:
+ pdf_contents = fd.read()
resp = app.post(data[1]['files'][0]['url'], upload_files=[('files[]', 'test.pdf', pdf_contents)])
assert resp.json[0]['token']
assert CartaDSFile.objects.filter(tracking_code=dossier.tracking_code, sent_to_cartads=None).count() == 2
diff --git a/tests/test_cityweb.py b/tests/test_cityweb.py
index 882ec06e..553cd768 100644
--- a/tests/test_cityweb.py
+++ b/tests/test_cityweb.py
@@ -60,9 +60,11 @@ def payload(request):
def assert_xml_doc(filename, assertions):
- schema = etree.XMLSchema(etree.parse(open(os.path.join(get_test_base_dir('cityweb'), 'cityweb.xsd'))))
+ with open(os.path.join(get_test_base_dir('cityweb'), 'cityweb.xsd')) as fd:
+ schema = etree.XMLSchema(etree.parse(fd))
- content = open(filename).read()
+ with open(filename) as fd:
+ content = fd.read()
xml_content = etree.fromstring(content)
assert len(xml_content.nsmap) == 1
assert xml_content.nsmap['xs'] == "http://tempuri.org/XMLSchema.xsd"
diff --git a/tests/test_cmis.py b/tests/test_cmis.py
index eb3b1dab..bf9ba4dd 100644
--- a/tests/test_cmis.py
+++ b/tests/test_cmis.py
@@ -529,15 +529,18 @@ def test_raw_uploadfile(mocked_request, app, setup, debug, caplog):
"""simulate the 3 (ordered) HTTP queries involved"""
response = {'status': '200'}
if method == 'GET' and uri == 'http://example.com/cmisatom':
- content = open('%s/tests/data/cmis/cmis1.out.xml' % os.getcwd(), 'rb').read()
+ with open('%s/tests/data/cmis/cmis1.out.xml' % os.getcwd(), 'rb') as fd:
+ content = fd.read()
elif (
method == 'GET'
and uri
== 'http://example.com/cmisatom/test/path?path=/test-eo&filter=&includeAllowableActions=false&includeACL=false&includePolicyIds=false&includeRelationships=&renditionFilter='
):
- content = open('%s/tests/data/cmis/cmis2.out.xml' % os.getcwd(), 'rb').read()
+ with open('%s/tests/data/cmis/cmis2.out.xml' % os.getcwd(), 'rb') as fd:
+ content = fd.read()
elif method == 'POST' and uri == 'http://example.com/cmisatom/test/children?id=L3Rlc3QtZW8%3D':
- expected_input = open('%s/tests/data/cmis/cmis3.in.xml' % os.getcwd(), 'r').read()
+ with open('%s/tests/data/cmis/cmis3.in.xml' % os.getcwd(), 'r') as fd:
+ expected_input = fd.read()
expected_input = expected_input.replace('\n', '')
expected_input = re.sub('> *<', '><', expected_input)
input1 = ET.tostring(ET.XML(expected_input))
@@ -556,7 +559,8 @@ def test_raw_uploadfile(mocked_request, app, setup, debug, caplog):
if input1 != input2:
raise Exception('expect [[%s]] but get [[%s]]' % (body, expected_input))
- content = open('%s/tests/data/cmis/cmis3.out.xml' % os.getcwd(), 'rb').read()
+ with open('%s/tests/data/cmis/cmis3.out.xml' % os.getcwd(), 'rb') as fd:
+ content = fd.read()
else:
raise Exception('my fault error, url is not yet mocked: %s' % uri)
return (response, content)
diff --git a/tests/test_family.py b/tests/test_family.py
index 59f72e07..d84e42c4 100644
--- a/tests/test_family.py
+++ b/tests/test_family.py
@@ -375,68 +375,72 @@ def test_fondettes_concerto_validation():
resource.clean()
filepath = os.path.join(os.path.dirname(__file__), 'data', 'family_data.zip')
- resource.archive = File(open(filepath), 'family_data.zip')
- with pytest.raises(ValidationError):
- resource.clean()
+ with open(filepath) as fd:
+ resource.archive = File(fd, 'family_data.zip')
+ with pytest.raises(ValidationError):
+ resource.clean()
def test_orleans_concerto_loader():
# all related objects will also be deleted
Family.objects.all().delete()
filepath = os.path.join(os.path.dirname(__file__), 'data', 'orleans', 'family_data_orleans.zip')
- resource = GenericFamily(
- title='test orleans',
- slug='test-orleans',
- archive=File(open(filepath, 'rb'), 'family_data_orleans.zip'),
- file_format='concerto_orleans',
- )
- from passerelle.apps.family.loaders.concerto_orleans import Loader
+ with open(filepath, 'rb') as fd:
+ resource = GenericFamily(
+ title='test orleans',
+ slug='test-orleans',
+ archive=File(fd, 'family_data_orleans.zip'),
+ file_format='concerto_orleans',
+ )
+ from passerelle.apps.family.loaders.concerto_orleans import Loader
- loader = Loader(resource)
- loader.archive = zipfile.ZipFile(filepath)
+ loader = Loader(resource)
+ with zipfile.ZipFile(filepath) as z:
+ loader.archive = z
- families = loader.build_families()
- assert len(families) == 18
- for family in families.values():
- assert family['external_id']
- assert family['adults']
- assert family['login']
- assert family['password']
- assert family['zipcode']
- assert family['city']
- assert len(family['adults']) > 0
+ families = loader.build_families()
- for adult in family['adults']:
- assert adult['first_name']
- assert adult['last_name']
- assert adult['sex']
- assert adult['external_id']
- assert adult['zipcode']
- assert adult['city']
- assert len(family['children']) >= 1
- for child in family['children']:
- assert child['external_id']
- assert child['first_name']
- assert child['last_name']
- assert child['sex']
- assert 'birthdate' in child
+ assert len(families) == 18
+ for family in families.values():
+ assert family['external_id']
+ assert family['adults']
+ assert family['login']
+ assert family['password']
+ assert family['zipcode']
+ assert family['city']
+ assert len(family['adults']) > 0
- assert 'invoices' in family
- if family['invoices']:
- for invoice in family['invoices']:
- assert invoice['external_id']
- assert invoice['label']
- assert invoice['issue_date']
- assert invoice['online_payment']
- assert 'autobilling' in invoice
- assert 'amount' in invoice
- assert 'total_amount' in invoice
+ for adult in family['adults']:
+ assert adult['first_name']
+ assert adult['last_name']
+ assert adult['sex']
+ assert adult['external_id']
+ assert adult['zipcode']
+ assert adult['city']
+ assert len(family['children']) >= 1
+ for child in family['children']:
+ assert child['external_id']
+ assert child['first_name']
+ assert child['last_name']
+ assert child['sex']
+ assert 'birthdate' in child
- # there are 4 families with invoices in test data
- assert len([f for f in families.values() if f['invoices']]) == 4
- # and 14 families with no invoices
- assert len([f for f in families.values() if not f['invoices']]) == 14
- resource.save()
+ assert 'invoices' in family
+ if family['invoices']:
+ for invoice in family['invoices']:
+ assert invoice['external_id']
+ assert invoice['label']
+ assert invoice['issue_date']
+ assert invoice['online_payment']
+ assert 'autobilling' in invoice
+ assert 'amount' in invoice
+ assert 'total_amount' in invoice
+
+ # there are 4 families with invoices in test data
+ assert len([f for f in families.values() if f['invoices']]) == 4
+ # and 14 families with no invoices
+ assert len([f for f in families.values() if not f['invoices']]) == 14
+ resource.save()
assert Family.objects.filter(resource=resource).count() == 18
assert Adult.objects.all().count() == 31
@@ -510,12 +514,13 @@ def test_family_pending_invoices_by_nameid():
def test_incorrect_orleans_data(caplog):
filepath = os.path.join(os.path.dirname(__file__), 'data', 'family_incorrect_data_orleans.zip')
- GenericFamily.objects.create(
- title='test orleans',
- slug='test-orleans',
- archive=File(open(filepath, 'rb'), 'family_incorrect_data_orleans.zip'),
- file_format='concerto_orleans',
- )
+ with open(filepath, 'rb') as fd:
+ GenericFamily.objects.create(
+ title='test orleans',
+ slug='test-orleans',
+ archive=File(fd, 'family_incorrect_data_orleans.zip'),
+ file_format='concerto_orleans',
+ )
for record in caplog.records:
assert 'Error occured while importing data:' in record.message
assert record.name == 'passerelle.resource.family.test-orleans'
diff --git a/tests/test_generic_endpoint.py b/tests/test_generic_endpoint.py
index 4870787f..a4f01c25 100644
--- a/tests/test_generic_endpoint.py
+++ b/tests/test_generic_endpoint.py
@@ -56,7 +56,8 @@ DEMAND_STATUS = {'closed': True, 'status': 'accepted', 'comment': 'dossier trait
@mock.patch('passerelle.apps.mdel.models.Demand.create_zip', lambda x, y: '1-14-ILE-LA')
def test_generic_payload_logging(caplog, app, mdel):
filename = os.path.join(os.path.dirname(__file__), 'data', 'mdel', 'formdata.json')
- payload = json.load(open(filename))
+ with open(filename) as fd:
+ payload = json.load(fd)
resp = app.post_json('/mdel/test/create', params=payload, status=200)
assert resp.json['data']['demand_id'] == '1-14-ILE-LA'
@@ -82,7 +83,8 @@ def test_generic_payload_logging(caplog, app, mdel):
@mock.patch('passerelle.utils.Request.get')
def test_proxy_logger(mocked_get, caplog, app, arcgis):
- payload = open(os.path.join(os.path.dirname(__file__), 'data', 'nancy_arcgis', 'sigresponse.json')).read()
+ with open(os.path.join(os.path.dirname(__file__), 'data', 'nancy_arcgis', 'sigresponse.json')) as fd:
+ payload = fd.read()
mocked_get.return_value = tests.utils.FakedResponse(content=payload, status_code=200)
# simple logger
@@ -160,7 +162,8 @@ def test_proxy_logger(mocked_get, caplog, app, arcgis):
@mock.patch('requests.Session.send')
def test_proxy_logger_transaction_id(mocked_send, app, arcgis):
- payload = open(os.path.join(os.path.dirname(__file__), 'data', 'nancy_arcgis', 'sigresponse.json')).read()
+ with open(os.path.join(os.path.dirname(__file__), 'data', 'nancy_arcgis', 'sigresponse.json')) as fd:
+ payload = fd.read()
mocked_send.return_value = tests.utils.FakedResponse(content=payload, status_code=200)
arcgis.log_evel = 'DEBUG'
arcgis.base_url = 'https://example.net/'
@@ -936,7 +939,8 @@ def test_generic_up_in_endpoints_infos(db, app, connector_class, expected):
def test_generic_endpoint_superuser_access(db, app, admin_user, simple_user):
MDEL.objects.create(slug='test')
filename = os.path.join(os.path.dirname(__file__), 'data', 'mdel', 'formdata.json')
- payload = json.load(open(filename))
+ with open(filename) as fd:
+ payload = json.load(fd)
app = login(app, username='user', password='user')
resp = app.post_json('/mdel/test/create', params=payload, status=403)
diff --git a/tests/test_grenoble_gru.py b/tests/test_grenoble_gru.py
index 1ca281b5..29753403 100644
--- a/tests/test_grenoble_gru.py
+++ b/tests/test_grenoble_gru.py
@@ -127,7 +127,8 @@ def test_contact_mode_typologies_list_with_invalid_xml(app, setup):
with mock.patch('passerelle.utils.Request.post') as request_post:
response = mock.Mock()
types_filename = os.path.join(os.path.dirname(__file__), 'data', 'grenoble_gru_typologies.xml')
- types = open(types_filename).read()
+ with open(types_filename) as fd:
+ types = fd.read()
response.content = types.replace('Courrier', 'Courrier & autres')
request_post.return_value = response
endpoint = reverse(
@@ -145,7 +146,8 @@ def test_contact_mode_typologies_list(app, setup):
with mock.patch('passerelle.utils.Request.post') as request_post:
response = mock.Mock()
types_filename = os.path.join(os.path.dirname(__file__), 'data', 'grenoble_gru_typologies.xml')
- response.content = open(types_filename).read()
+ with open(types_filename) as fd:
+ response.content = fd.read()
request_post.return_value = response
endpoint = reverse(
'generic-endpoint',
@@ -171,7 +173,8 @@ def test_contact_mode_typologies_list(app, setup):
def get_typo_response():
types_filename = os.path.join(os.path.dirname(__file__), 'data', 'grenoble_gru_typologies.xml')
- types = open(types_filename).read()
+ with open(types_filename) as fd:
+ types = fd.read()
typo_response = mock.Mock()
typo_response.content = types
return typo_response
@@ -357,9 +360,8 @@ def test_get_pavs(app, setup):
with mock.patch('passerelle.utils.Request.post') as request_post:
response = mock.Mock()
json_response = mock.Mock()
- json_response.return_value = json.load(
- open(os.path.join(os.path.dirname(__file__), 'data', 'grenoble_gru_pavs.json'))
- )
+ with open(os.path.join(os.path.dirname(__file__), 'data', 'grenoble_gru_pavs.json')) as fd:
+ json_response.return_value = json.load(fd)
response.json = json_response
request_post.return_value = response
response = app.get(
diff --git a/tests/test_import_export.py b/tests/test_import_export.py
index ddbbacfe..6347d612 100644
--- a/tests/test_import_export.py
+++ b/tests/test_import_export.py
@@ -200,7 +200,8 @@ def test_export_to_file(app, setup, filetype):
assert Bdp.objects.count() == 1
bdp.delete()
assert Bdp.objects.count() == 0
- import_site(json.load(open(f.name)), overwrite=True)
+ with open(f.name) as fd:
+ import_site(json.load(fd), overwrite=True)
assert Bdp.objects.count() == 1
diff --git a/tests/test_iparapheur.py b/tests/test_iparapheur.py
index f10ac6e4..cf624f05 100644
--- a/tests/test_iparapheur.py
+++ b/tests/test_iparapheur.py
@@ -73,7 +73,8 @@ def iph_mocked_get(url, params=None, **kwargs):
else:
raise Exception('my fault error, url is not yet mocked: %s' % url)
- response._content = open(target_file, 'rb').read()
+ with open(target_file, 'rb') as fd:
+ response._content = fd.read()
response.status_code = 200
return response
@@ -346,9 +347,8 @@ def test_get_file(mocked_post, mocked_get, app, conn):
response = Response()
response.status_code = 200
- soap_response = open(
- os.path.join(os.path.dirname(__file__), 'data/iparapheur_get_file_response.xml'), 'rb'
- ).read()
+ with open(os.path.join(os.path.dirname(__file__), 'data/iparapheur_get_file_response.xml'), 'rb') as fd:
+ soap_response = fd.read()
response._content = force_bytes(soap_response)
mocked_post.return_value = response
@@ -398,9 +398,8 @@ def test_get_file_invalid_appendix(mocked_post, mocked_get, app, conn):
response = Response()
response.status_code = 200
- soap_response = open(
- os.path.join(os.path.dirname(__file__), 'data/iparapheur_get_file_response.xml'), 'rb'
- ).read()
+ with open(os.path.join(os.path.dirname(__file__), 'data/iparapheur_get_file_response.xml'), 'rb') as fd:
+ soap_response = fd.read()
response._content = soap_response
mocked_post.return_value = response
@@ -421,9 +420,8 @@ def test_get_file_not_found_appendix(mocked_post, mocked_get, app, conn):
response = Response()
response.status_code = 200
- soap_response = open(
- os.path.join(os.path.dirname(__file__), 'data/iparapheur_get_file_response.xml'), 'rb'
- ).read()
+ with open(os.path.join(os.path.dirname(__file__), 'data/iparapheur_get_file_response.xml'), 'rb') as fd:
+ soap_response = fd.read()
response._content = soap_response
mocked_post.return_value = response
@@ -444,9 +442,8 @@ def test_get_file_appendix(mocked_post, mocked_get, app, conn):
response = Response()
response.status_code = 200
- soap_response = open(
- os.path.join(os.path.dirname(__file__), 'data/iparapheur_get_file_response.xml'), 'rb'
- ).read()
+ with open(os.path.join(os.path.dirname(__file__), 'data/iparapheur_get_file_response.xml'), 'rb') as fd:
+ soap_response = fd.read()
response._content = soap_response
mocked_post.return_value = response
@@ -521,7 +518,8 @@ def test_call_wsdl(mocked_get, app, conn):
)
resp = app.get(url)
assert resp.headers['content-type'] == 'text/xml'
- assert resp.content == open(wsdl_file(), 'rb').read()
+ with open(wsdl_file(), 'rb') as fd:
+ assert resp.content == fd.read()
@mock.patch('passerelle.utils.Request.get', side_effect=ConnectionError('mocked error'))
@@ -548,12 +546,14 @@ def test_no_auth_on_wsdl_imports(mocked_post, mocked_load, mocked_get, app, conn
"""
response_xmlmime, response_post = Response(), Response()
response_xmlmime.status_code, response_post.status_code = 200, 200
- response_xmlmime._content = open(xmlmime(), 'rb').read()
+ with open(xmlmime(), 'rb') as fd:
+ response_xmlmime._content = fd.read()
response_post._content = force_bytes(
"""