From 3dbee7bc6346e5cd0b13b1c45ed24dc270d021ef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Laur=C3=A9line=20Gu=C3=A9rin?= Date: Thu, 17 Mar 2022 17:56:30 +0100 Subject: [PATCH] misc: fix consider-using-with pylint error (#62099) --- passerelle/apps/cartads_cs/models.py | 76 ++++---- passerelle/apps/cryptor/models.py | 6 +- .../commands/import_orleans_data.py | 5 +- passerelle/apps/family/models.py | 181 +++++++++--------- passerelle/apps/sp_fr/models.py | 140 +++++++------- .../base/management/commands/export_site.py | 7 +- passerelle/utils/zip.py | 60 +++--- tests/test_atos_genesys.py | 10 +- tests/test_cartads_cs.py | 15 +- tests/test_cityweb.py | 6 +- tests/test_cmis.py | 12 +- tests/test_family.py | 119 ++++++------ tests/test_generic_endpoint.py | 12 +- tests/test_grenoble_gru.py | 14 +- tests/test_import_export.py | 3 +- tests/test_iparapheur.py | 32 ++-- tests/test_mdel.py | 4 +- tests/test_utils_conversion.py | 9 +- tests/test_utils_zip.py | 18 +- 19 files changed, 383 insertions(+), 346 deletions(-) diff --git a/passerelle/apps/cartads_cs/models.py b/passerelle/apps/cartads_cs/models.py index 2e04fd0a..21791a98 100644 --- a/passerelle/apps/cartads_cs/models.py +++ b/passerelle/apps/cartads_cs/models.py @@ -799,42 +799,43 @@ class AbstractCartaDSCS(BaseResource): def pack(self, dossier_id): dossier = CartaDSDossier.objects.get(id=dossier_id) zip_filename = os.path.join(default_storage.path('cartads_cs'), '%s.zip' % dossier.tracking_code) - zip_file = zipfile.ZipFile(zip_filename, mode='w') - liste_pdf = self.liste_pdf(None, dossier.type_dossier_id) - cerfa_id = liste_pdf['data'][0]['id'] - for cerfa in liste_pdf['data']: - if cerfa['id'] == 'AUTRES_DEMANDEURS': - continue - cerfa_id = cerfa['id'] - break - cerfa_id = cerfa_id.replace('*', '-') - pieces = self.pieces(None, dossier.type_dossier_id, dossier.objet_demande_id, dossier.tracking_code) - for piece in pieces['data']: - cnt = 1 - for file in piece['files']: - if not file.get('id'): + with zipfile.ZipFile(zip_filename, mode='w') as zip_file: + liste_pdf = self.liste_pdf(None, dossier.type_dossier_id) + cerfa_id = liste_pdf['data'][0]['id'] + for cerfa in liste_pdf['data']: + if cerfa['id'] == 'AUTRES_DEMANDEURS': continue - cartads_file = CartaDSFile.objects.get(id=file['id']) - if piece['id'] == 'cerfa-%s-%s' % (dossier.type_dossier_id, dossier.objet_demande_id): - zip_file.write(cartads_file.uploaded_file.path, '%s.pdf' % cerfa_id) - elif piece['id'].startswith('cerfa-autres-'): - zip_file.write( - cartads_file.uploaded_file.path, - 'Fiches_complementaires/Cerfa_autres_demandeurs_%d.pdf' % cnt, - ) - else: - zip_file.write( - cartads_file.uploaded_file.path, - 'Pieces/%s-%s%s%s' - % ( - piece['id'], - piece['codePiece'], - cnt, - os.path.splitext(cartads_file.uploaded_file.path)[-1], - ), - ) - cnt += 1 - zip_file.close() + cerfa_id = cerfa['id'] + break + cerfa_id = cerfa_id.replace('*', '-') + pieces = self.pieces( + None, dossier.type_dossier_id, dossier.objet_demande_id, dossier.tracking_code + ) + for piece in pieces['data']: + cnt = 1 + for file in piece['files']: + if not file.get('id'): + continue + cartads_file = CartaDSFile.objects.get(id=file['id']) + if piece['id'] == 'cerfa-%s-%s' % (dossier.type_dossier_id, dossier.objet_demande_id): + zip_file.write(cartads_file.uploaded_file.path, '%s.pdf' % cerfa_id) + elif piece['id'].startswith('cerfa-autres-'): + zip_file.write( + cartads_file.uploaded_file.path, + 'Fiches_complementaires/Cerfa_autres_demandeurs_%d.pdf' % cnt, + ) + else: + zip_file.write( + cartads_file.uploaded_file.path, + 'Pieces/%s-%s%s%s' + % ( + piece['id'], + piece['codePiece'], + cnt, + os.path.splitext(cartads_file.uploaded_file.path)[-1], + ), + ) + cnt += 1 dossier.zip_ready = True dossier.save() self.add_job('send_to_cartads', dossier_id=dossier.id) @@ -843,8 +844,9 @@ class AbstractCartaDSCS(BaseResource): ftp = FTP(self.ftp_server) ftp.login(self.ftp_username, self.ftp_password) ftp.cwd(self.ftp_client_name) - ftp.storbinary('STOR %s' % os.path.basename(zip_filename), open(zip_filename, 'rb')) - ftp.quit() + with open(zip_filename, 'rb') as fd: + ftp.storbinary('STOR %s' % os.path.basename(zip_filename), fd) + ftp.quit() def send_to_cartads(self, dossier_id): dossier = CartaDSDossier.objects.get(id=dossier_id) diff --git a/passerelle/apps/cryptor/models.py b/passerelle/apps/cryptor/models.py index bd5df306..2cc1810f 100644 --- a/passerelle/apps/cryptor/models.py +++ b/passerelle/apps/cryptor/models.py @@ -214,9 +214,11 @@ class Cryptor(BaseResource): if not os.path.exists(metadata_filename): raise APIError('unknown uuid', http_status=404) - content = read_decrypt(open(content_filename, 'rb'), self.private_key) + with open(content_filename, 'rb') as fd: + content = read_decrypt(fd, self.private_key) - metadata = json.load(open(metadata_filename, 'r')) + with open(metadata_filename, 'r') as fd: + metadata = json.load(fd) filename = metadata.get('filename') content_type = metadata.get('content_type') diff --git a/passerelle/apps/family/management/commands/import_orleans_data.py b/passerelle/apps/family/management/commands/import_orleans_data.py index c477f9ee..b8d31632 100644 --- a/passerelle/apps/family/management/commands/import_orleans_data.py +++ b/passerelle/apps/family/management/commands/import_orleans_data.py @@ -50,14 +50,15 @@ class Command(BaseCommand): storage = DefaultStorage() lock_filename = storage.path('family-%s/import-orleans-data.lock' % connector.id) try: - fd = open(lock_filename, 'w') + fd = open(lock_filename, 'w') # pylint: disable=consider-using-with fcntl.lockf(fd, fcntl.LOCK_EX | fcntl.LOCK_NB) except IOError: raise CommandError('Command already running.') try: archive_name = os.path.basename(options['archive_file']) - connector.archive.save(archive_name, File(open(options['archive_file'], 'rb'))) + with open(options['archive_file'], 'rb') as archive_fd: + connector.archive.save(archive_name, File(archive_fd)) except Exception as e: raise CommandError('Error occured: %s' % e) finally: diff --git a/passerelle/apps/family/models.py b/passerelle/apps/family/models.py index 17493086..88f8aa65 100644 --- a/passerelle/apps/family/models.py +++ b/passerelle/apps/family/models.py @@ -174,14 +174,14 @@ class GenericFamily(BaseResource): def clean(self): if self.archive: try: - archive = zipfile.ZipFile(self.archive) + with zipfile.ZipFile(self.archive) as archive: + if self.file_format != 'native': + modname = 'passerelle.apps.family.loaders.%s' % self.file_format + __import__(modname) + module = sys.modules[modname] + module.Loader(self).clean(archive) except zipfile.BadZipfile: raise ValidationError(_('Invalid zip file.')) - if self.file_format != 'native': - modname = 'passerelle.apps.family.loaders.%s' % self.file_format - __import__(modname) - module = sys.modules[modname] - module.Loader(self).clean(archive) return super().clean() @@ -195,101 +195,101 @@ class GenericFamily(BaseResource): if not os.path.exists(invoices_dir): os.makedirs(invoices_dir) - archive = zipfile.ZipFile(self.archive.path) - if self.file_format != 'native': - modname = 'passerelle.apps.family.loaders.%s' % self.file_format - __import__(modname) - module = sys.modules[modname] - module.Loader(self).load(archive) - return + with zipfile.ZipFile(self.archive.path) as archive: + if self.file_format != 'native': + modname = 'passerelle.apps.family.loaders.%s' % self.file_format + __import__(modname) + module = sys.modules[modname] + module.Loader(self).load(archive) + return - archive_files = archive.namelist() + archive_files = archive.namelist() - family_files = [d for d in archive_files if d.endswith('.json')] - families = [] - invoices = [] - children = [] - adults = [] + family_files = [d for d in archive_files if d.endswith('.json')] + families = [] + invoices = [] + children = [] + adults = [] - for f in family_files: - family_data = json_loads(archive.read(f)) - families.append(family_data['id']) - address = family_data.get('address') or {} - family_data.update(address) - data = dict_cherry_pick( - family_data, - ( - 'login', - 'password', - 'family_quotient', - ('number', 'street_number'), - ('postal_code', 'zipcode'), - ('street', 'street_name'), - ('complement', 'address_complement'), - ), - ) - family, dummy = Family.objects.update_or_create( - external_id=family_data['id'], resource=self, defaults=data - ) - for adult in family_data.get('adults') or []: - adults.append(adult['id']) - adult_address = adult.get('address') or {} - adult.update(adult_address) + for f in family_files: + family_data = json_loads(archive.read(f)) + families.append(family_data['id']) + address = family_data.get('address') or {} + family_data.update(address) data = dict_cherry_pick( - adult, + family_data, ( - 'first_name', - 'last_name', - 'phone', - ('mobile', 'cellphone'), - 'sex', + 'login', + 'password', + 'family_quotient', ('number', 'street_number'), ('postal_code', 'zipcode'), ('street', 'street_name'), ('complement', 'address_complement'), - 'country', ), ) - Adult.objects.update_or_create(family=family, external_id=adult['id'], defaults=data) - # cleanup adults - Adult.objects.exclude(external_id__in=adults).delete() - - for child in family_data.get('children') or []: - children.append(child['id']) - data = dict_cherry_pick(child, ('first_name', 'last_name', 'sex', 'birthdate')) - Child.objects.get_or_create(family=family, external_id=child['id'], defaults=data) - # cleanup children - Child.objects.exclude(external_id__in=children).delete() - - for invoice in family_data['invoices']: - invoices.append(invoice['id']) - data = dict_cherry_pick( - invoice, - ( - 'label', - ('created', 'issue_date'), - 'pay_limit_date', - 'litigation_date', - 'total_amount', - 'payment_date', - 'amount', - 'autobilling', - ), + family, dummy = Family.objects.update_or_create( + external_id=family_data['id'], resource=self, defaults=data ) - for date_attribute in data.keys(): - if not date_attribute.endswith('_date'): - continue - if date_attribute == 'payment_date': - data[date_attribute] = get_datetime(data[date_attribute]) - else: - data[date_attribute] = get_date(data[date_attribute]) - data['paid'] = bool(data.get('payment_date')) - Invoice.objects.update_or_create( - resource=self, family=family, external_id=invoice['id'], defaults=data - ) - if 'invoices/%s.pdf' % invoice['id'] in archive_files: - with open(os.path.join(invoices_dir, '%s.pdf' % invoice['id']), 'wb') as fp: - fp.write(archive.read('invoices/%s.pdf' % invoice['id'])) + for adult in family_data.get('adults') or []: + adults.append(adult['id']) + adult_address = adult.get('address') or {} + adult.update(adult_address) + data = dict_cherry_pick( + adult, + ( + 'first_name', + 'last_name', + 'phone', + ('mobile', 'cellphone'), + 'sex', + ('number', 'street_number'), + ('postal_code', 'zipcode'), + ('street', 'street_name'), + ('complement', 'address_complement'), + 'country', + ), + ) + Adult.objects.update_or_create(family=family, external_id=adult['id'], defaults=data) + # cleanup adults + Adult.objects.exclude(external_id__in=adults).delete() + + for child in family_data.get('children') or []: + children.append(child['id']) + data = dict_cherry_pick(child, ('first_name', 'last_name', 'sex', 'birthdate')) + Child.objects.get_or_create(family=family, external_id=child['id'], defaults=data) + # cleanup children + Child.objects.exclude(external_id__in=children).delete() + + for invoice in family_data['invoices']: + invoices.append(invoice['id']) + data = dict_cherry_pick( + invoice, + ( + 'label', + ('created', 'issue_date'), + 'pay_limit_date', + 'litigation_date', + 'total_amount', + 'payment_date', + 'amount', + 'autobilling', + ), + ) + for date_attribute in data.keys(): + if not date_attribute.endswith('_date'): + continue + if date_attribute == 'payment_date': + data[date_attribute] = get_datetime(data[date_attribute]) + else: + data[date_attribute] = get_date(data[date_attribute]) + data['paid'] = bool(data.get('payment_date')) + Invoice.objects.update_or_create( + resource=self, family=family, external_id=invoice['id'], defaults=data + ) + if 'invoices/%s.pdf' % invoice['id'] in archive_files: + with open(os.path.join(invoices_dir, '%s.pdf' % invoice['id']), 'wb') as fp: + fp.write(archive.read('invoices/%s.pdf' % invoice['id'])) # cleanup invoices Invoice.objects.exclude(external_id__in=invoices).delete() @@ -526,7 +526,8 @@ class Invoice(models.Model): if not self.has_pdf: raise Http404(_('PDF file not found')) - response = HttpResponse(open(self.pdf_filename(), 'rb').read(), content_type='application/pdf') + with open(self.pdf_filename(), 'rb') as fd: + response = HttpResponse(fd.read(), content_type='application/pdf') response['Content-Disposition'] = 'attachment; filename=%s.pdf' % self.external_id return response diff --git a/passerelle/apps/sp_fr/models.py b/passerelle/apps/sp_fr/models.py index 399bee07..083f5e2a 100644 --- a/passerelle/apps/sp_fr/models.py +++ b/passerelle/apps/sp_fr/models.py @@ -232,76 +232,78 @@ class Resource(BaseResource): def process(self, fd): try: - archive = zipfile.ZipFile(fd) - except Exception: + with zipfile.ZipFile(fd) as archive: + # sort files + doc_files = [] + ent_files = [] + attachments = {} + for name in archive.namelist(): + if ENT_PATTERN.match(name): + ent_files.append(name) + + if len(ent_files) != 1: + return False, 'too many/few ent files found: %s' % ent_files + + ent_file = ent_files[0] + + with archive.open(ent_file) as fd: + document = ET.parse(fd) + + for pj_node in PIECE_JOINTE_XPATH(document): + code = CODE_XPATH(pj_node)[0].text + code = 'pj_' + code.lower().replace('-', '_') + fichier = FICHIER_XPATH(pj_node)[0].text + attachments.setdefault(code, []).append(fichier) + for doc_node in DOCUMENTS_XPATH(document): + code = CODE_XPATH(doc_node)[0].text + code = 'doc_' + code.lower().replace('-', '_') + fichier = FICHIER_DONNEES_XPATH(doc_node)[0].text + attachments.setdefault(code, []).append(fichier) + + doc_files = [ + value for l in attachments.values() for value in l if value.lower().endswith('.xml') + ] + if len(doc_files) != 1: + return False, 'too many/few doc files found: %s' % doc_files + + for key in attachments: + if len(attachments[key]) > 1: + return False, 'too many attachments of kind %s: %r' % (key, attachments[key]) + name = attachments[key][0] + with archive.open(attachments[key][0]) as zip_fd: + content = zip_fd.read() + attachments[key] = { + 'filename': name, + 'content': base64.b64encode(content).decode('ascii'), + 'content_type': 'application/octet-stream', + } + + if self.procedure == PROCEDURE_RCO and not attachments: + return False, 'no attachments but RCO requires them' + + doc_file = doc_files[0] + + insee_codes = ROUTAGE_XPATH(document) + if len(insee_codes) != 1: + return False, 'too many/few insee codes found: %s' % insee_codes + insee_code = insee_codes[0] + + email = EMAIL_XPATH(document) + email = email[0] if email else '' + + data = { + 'insee_code': insee_code, + 'email': email, + } + data.update(attachments) + + with archive.open(doc_file) as fd: + document = ET.parse(fd) + data.update(self.extract_data(document)) + if hasattr(self, 'update_data_%s' % self.procedure): + getattr(self, 'update_data_%s' % self.procedure)(data) + except zipfile.BadZipfile: return False, 'could not load zipfile' - # sort files - doc_files = [] - ent_files = [] - attachments = {} - for name in archive.namelist(): - if ENT_PATTERN.match(name): - ent_files.append(name) - - if len(ent_files) != 1: - return False, 'too many/few ent files found: %s' % ent_files - - ent_file = ent_files[0] - - with archive.open(ent_file) as fd: - document = ET.parse(fd) - - for pj_node in PIECE_JOINTE_XPATH(document): - code = CODE_XPATH(pj_node)[0].text - code = 'pj_' + code.lower().replace('-', '_') - fichier = FICHIER_XPATH(pj_node)[0].text - attachments.setdefault(code, []).append(fichier) - for doc_node in DOCUMENTS_XPATH(document): - code = CODE_XPATH(doc_node)[0].text - code = 'doc_' + code.lower().replace('-', '_') - fichier = FICHIER_DONNEES_XPATH(doc_node)[0].text - attachments.setdefault(code, []).append(fichier) - - doc_files = [value for l in attachments.values() for value in l if value.lower().endswith('.xml')] - if len(doc_files) != 1: - return False, 'too many/few doc files found: %s' % doc_files - - for key in attachments: - if len(attachments[key]) > 1: - return False, 'too many attachments of kind %s: %r' % (key, attachments[key]) - name = attachments[key][0] - with archive.open(attachments[key][0]) as zip_fd: - content = zip_fd.read() - attachments[key] = { - 'filename': name, - 'content': base64.b64encode(content).decode('ascii'), - 'content_type': 'application/octet-stream', - } - - if self.procedure == PROCEDURE_RCO and not attachments: - return False, 'no attachments but RCO requires them' - - doc_file = doc_files[0] - - insee_codes = ROUTAGE_XPATH(document) - if len(insee_codes) != 1: - return False, 'too many/few insee codes found: %s' % insee_codes - insee_code = insee_codes[0] - - email = EMAIL_XPATH(document) - email = email[0] if email else '' - - data = { - 'insee_code': insee_code, - 'email': email, - } - data.update(attachments) - - with archive.open(doc_file) as fd: - document = ET.parse(fd) - data.update(self.extract_data(document)) - if hasattr(self, 'update_data_%s' % self.procedure): - getattr(self, 'update_data_%s' % self.procedure)(data) return data, None def transfer(self, data): diff --git a/passerelle/base/management/commands/export_site.py b/passerelle/base/management/commands/export_site.py index d5fbd2d1..fef84d29 100644 --- a/passerelle/base/management/commands/export_site.py +++ b/passerelle/base/management/commands/export_site.py @@ -17,7 +17,8 @@ class Command(BaseCommand): def handle(self, *args, **options): if options['output']: - output = open(options['output'], 'w') - else: - output = sys.stdout + with open(options['output'], 'w') as output: + json.dump(export_site(slugs=options['slugs']), output, indent=4) + return + output = sys.stdout json.dump(export_site(slugs=options['slugs']), output, indent=4) diff --git a/passerelle/utils/zip.py b/passerelle/utils/zip.py index 501c9b33..91256ba9 100644 --- a/passerelle/utils/zip.py +++ b/passerelle/utils/zip.py @@ -244,8 +244,6 @@ class ZipTemplate(object): def diff_zip(one, two): - differences = [] - def compute_diff(one, two, fd_one, fd_two): content_one = fd_one.read() content_two = fd_two.read() @@ -257,32 +255,40 @@ def diff_zip(one, two): return ['File %s differs' % one] + diff return 'File %s differs' % one + def run(one, two): + differences = [] + with zipfile.ZipFile(one) as one_zip, zipfile.ZipFile(two) as two_zip: + one_nl = set(one_zip.namelist()) + two_nl = set(two_zip.namelist()) + for name in one_nl - two_nl: + differences.append('File %s only in %s' % (name, one)) + for name in two_nl - one_nl: + differences.append('File %s only in %s' % (name, two)) + for name in one_nl & two_nl: + with one_zip.open(name) as fd_one: + with two_zip.open(name) as fd_two: + difference = compute_diff(name, name, fd_one, fd_two) + if difference: + differences.append(difference) + + if not differences: + # check file order in zip + one_zip_files = [zi.filename for zi in one_zip.infolist()] + two_zip_files = [zi.filename for zi in two_zip.infolist()] + if one_zip_files != two_zip_files: + differences.append('Files are not in the same order') + return differences + if not hasattr(one, 'read'): - one = open(one, mode='rb') + with open(one, mode='rb') as one: + if not hasattr(two, 'read'): + with open(two, 'rb') as two: + return run(one, two) + with two: + return run(one, two) with one: if not hasattr(two, 'read'): - two = open(two, 'rb') + with open(two, 'rb') as two: + return run(one, two) with two: - with zipfile.ZipFile(one) as one_zip: - with zipfile.ZipFile(two) as two_zip: - one_nl = set(one_zip.namelist()) - two_nl = set(two_zip.namelist()) - for name in one_nl - two_nl: - differences.append('File %s only in %s' % (name, one)) - for name in two_nl - one_nl: - differences.append('File %s only in %s' % (name, two)) - for name in one_nl & two_nl: - with one_zip.open(name) as fd_one: - with two_zip.open(name) as fd_two: - difference = compute_diff(name, name, fd_one, fd_two) - if difference: - differences.append(difference) - - if not differences: - # check file order in zip - one_zip_files = [zi.filename for zi in one_zip.infolist()] - two_zip_files = [zi.filename for zi in two_zip.infolist()] - if one_zip_files != two_zip_files: - differences.append('Files are not in the same order') - - return differences + return run(one, two) diff --git a/tests/test_atos_genesys.py b/tests/test_atos_genesys.py index 09933d3f..ff6f3885 100644 --- a/tests/test_atos_genesys.py +++ b/tests/test_atos_genesys.py @@ -20,9 +20,8 @@ def genesys(db): @pytest.fixture def mock_codifications_ok(): - response = open( - os.path.join(os.path.dirname(__file__), 'data', 'genesys_select_codifications.xml') - ).read() + with open(os.path.join(os.path.dirname(__file__), 'data', 'genesys_select_codifications.xml')) as fd: + response = fd.read() with tests.utils.mock_url(FAKE_URL, response) as mock: yield mock @@ -143,9 +142,8 @@ def test_ws_link_created(app, genesys): assert Link.objects.count() == 0 -RESPONSE_SELECT_USAGER = open( - os.path.join(os.path.dirname(__file__), 'data', 'genesys_select_usager.xml') -).read() +with open(os.path.join(os.path.dirname(__file__), 'data', 'genesys_select_usager.xml')) as fd: + RESPONSE_SELECT_USAGER = fd.read() def test_ws_dossiers(app, genesys): diff --git a/tests/test_cartads_cs.py b/tests/test_cartads_cs.py index 89121d14..8b441d45 100644 --- a/tests/test_cartads_cs.py +++ b/tests/test_cartads_cs.py @@ -290,11 +290,13 @@ def test_pieces_management(connector, app, cached_data): resp = app.post(data[0]['files'][0]['url'], upload_files=[('files[]', 'test.pdf', b'%PDF...')]) assert resp.json == [{'error': 'The CERFA should be a PDF file.'}] - pdf_contents = open(os.path.join(os.path.dirname(__file__), 'data', 'minimal.pdf'), 'rb').read() + with open(os.path.join(os.path.dirname(__file__), 'data', 'minimal.pdf'), 'rb') as fd: + pdf_contents = fd.read() resp = app.post(data[0]['files'][0]['url'], upload_files=[('files[]', 'test.pdf', pdf_contents)]) assert resp.json == [{'error': 'The CERFA should not be a scanned document.'}] - pdf_contents = open(os.path.join(os.path.dirname(__file__), 'data', 'pdf-form.pdf'), 'rb').read() + with open(os.path.join(os.path.dirname(__file__), 'data', 'pdf-form.pdf'), 'rb') as fd: + pdf_contents = fd.read() resp = app.post(data[0]['files'][0]['url'], upload_files=[('files[]', 'test.pdf', pdf_contents)]) cerfa_token = resp.json[0]['token'] @@ -554,7 +556,8 @@ def test_doc_pieces_management(connector, app, cached_data): assert len(data[0]['files']) == 1 assert list(data[0]['files'][0].keys()) == ['url'] - pdf_contents = open(os.path.join(os.path.dirname(__file__), 'data', 'pdf-form.pdf'), 'rb').read() + with open(os.path.join(os.path.dirname(__file__), 'data', 'pdf-form.pdf'), 'rb') as fd: + pdf_contents = fd.read() resp = app.post(data[0]['files'][0]['url'], upload_files=[('files[]', 'test.pdf', pdf_contents)]) assert resp.json[0]['token'] assert CartaDSFile.objects.filter(tracking_code=dossier.tracking_code, sent_to_cartads=None).count() == 1 @@ -594,12 +597,14 @@ def test_daact_pieces_management(connector, app, cached_data): assert len(piece['files']) == 1 assert list(piece['files'][0].keys()) == ['url'] - pdf_contents = open(os.path.join(os.path.dirname(__file__), 'data', 'pdf-form.pdf'), 'rb').read() + with open(os.path.join(os.path.dirname(__file__), 'data', 'pdf-form.pdf'), 'rb') as fd: + pdf_contents = fd.read() resp = app.post(data[0]['files'][0]['url'], upload_files=[('files[]', 'test.pdf', pdf_contents)]) assert resp.json[0]['token'] assert CartaDSFile.objects.filter(tracking_code=dossier.tracking_code, sent_to_cartads=None).count() == 1 - pdf_contents = open(os.path.join(os.path.dirname(__file__), 'data', 'pdf-form.pdf'), 'rb').read() + with open(os.path.join(os.path.dirname(__file__), 'data', 'pdf-form.pdf'), 'rb') as fd: + pdf_contents = fd.read() resp = app.post(data[1]['files'][0]['url'], upload_files=[('files[]', 'test.pdf', pdf_contents)]) assert resp.json[0]['token'] assert CartaDSFile.objects.filter(tracking_code=dossier.tracking_code, sent_to_cartads=None).count() == 2 diff --git a/tests/test_cityweb.py b/tests/test_cityweb.py index 882ec06e..553cd768 100644 --- a/tests/test_cityweb.py +++ b/tests/test_cityweb.py @@ -60,9 +60,11 @@ def payload(request): def assert_xml_doc(filename, assertions): - schema = etree.XMLSchema(etree.parse(open(os.path.join(get_test_base_dir('cityweb'), 'cityweb.xsd')))) + with open(os.path.join(get_test_base_dir('cityweb'), 'cityweb.xsd')) as fd: + schema = etree.XMLSchema(etree.parse(fd)) - content = open(filename).read() + with open(filename) as fd: + content = fd.read() xml_content = etree.fromstring(content) assert len(xml_content.nsmap) == 1 assert xml_content.nsmap['xs'] == "http://tempuri.org/XMLSchema.xsd" diff --git a/tests/test_cmis.py b/tests/test_cmis.py index eb3b1dab..bf9ba4dd 100644 --- a/tests/test_cmis.py +++ b/tests/test_cmis.py @@ -529,15 +529,18 @@ def test_raw_uploadfile(mocked_request, app, setup, debug, caplog): """simulate the 3 (ordered) HTTP queries involved""" response = {'status': '200'} if method == 'GET' and uri == 'http://example.com/cmisatom': - content = open('%s/tests/data/cmis/cmis1.out.xml' % os.getcwd(), 'rb').read() + with open('%s/tests/data/cmis/cmis1.out.xml' % os.getcwd(), 'rb') as fd: + content = fd.read() elif ( method == 'GET' and uri == 'http://example.com/cmisatom/test/path?path=/test-eo&filter=&includeAllowableActions=false&includeACL=false&includePolicyIds=false&includeRelationships=&renditionFilter=' ): - content = open('%s/tests/data/cmis/cmis2.out.xml' % os.getcwd(), 'rb').read() + with open('%s/tests/data/cmis/cmis2.out.xml' % os.getcwd(), 'rb') as fd: + content = fd.read() elif method == 'POST' and uri == 'http://example.com/cmisatom/test/children?id=L3Rlc3QtZW8%3D': - expected_input = open('%s/tests/data/cmis/cmis3.in.xml' % os.getcwd(), 'r').read() + with open('%s/tests/data/cmis/cmis3.in.xml' % os.getcwd(), 'r') as fd: + expected_input = fd.read() expected_input = expected_input.replace('\n', '') expected_input = re.sub('> *<', '><', expected_input) input1 = ET.tostring(ET.XML(expected_input)) @@ -556,7 +559,8 @@ def test_raw_uploadfile(mocked_request, app, setup, debug, caplog): if input1 != input2: raise Exception('expect [[%s]] but get [[%s]]' % (body, expected_input)) - content = open('%s/tests/data/cmis/cmis3.out.xml' % os.getcwd(), 'rb').read() + with open('%s/tests/data/cmis/cmis3.out.xml' % os.getcwd(), 'rb') as fd: + content = fd.read() else: raise Exception('my fault error, url is not yet mocked: %s' % uri) return (response, content) diff --git a/tests/test_family.py b/tests/test_family.py index 59f72e07..d84e42c4 100644 --- a/tests/test_family.py +++ b/tests/test_family.py @@ -375,68 +375,72 @@ def test_fondettes_concerto_validation(): resource.clean() filepath = os.path.join(os.path.dirname(__file__), 'data', 'family_data.zip') - resource.archive = File(open(filepath), 'family_data.zip') - with pytest.raises(ValidationError): - resource.clean() + with open(filepath) as fd: + resource.archive = File(fd, 'family_data.zip') + with pytest.raises(ValidationError): + resource.clean() def test_orleans_concerto_loader(): # all related objects will also be deleted Family.objects.all().delete() filepath = os.path.join(os.path.dirname(__file__), 'data', 'orleans', 'family_data_orleans.zip') - resource = GenericFamily( - title='test orleans', - slug='test-orleans', - archive=File(open(filepath, 'rb'), 'family_data_orleans.zip'), - file_format='concerto_orleans', - ) - from passerelle.apps.family.loaders.concerto_orleans import Loader + with open(filepath, 'rb') as fd: + resource = GenericFamily( + title='test orleans', + slug='test-orleans', + archive=File(fd, 'family_data_orleans.zip'), + file_format='concerto_orleans', + ) + from passerelle.apps.family.loaders.concerto_orleans import Loader - loader = Loader(resource) - loader.archive = zipfile.ZipFile(filepath) + loader = Loader(resource) + with zipfile.ZipFile(filepath) as z: + loader.archive = z - families = loader.build_families() - assert len(families) == 18 - for family in families.values(): - assert family['external_id'] - assert family['adults'] - assert family['login'] - assert family['password'] - assert family['zipcode'] - assert family['city'] - assert len(family['adults']) > 0 + families = loader.build_families() - for adult in family['adults']: - assert adult['first_name'] - assert adult['last_name'] - assert adult['sex'] - assert adult['external_id'] - assert adult['zipcode'] - assert adult['city'] - assert len(family['children']) >= 1 - for child in family['children']: - assert child['external_id'] - assert child['first_name'] - assert child['last_name'] - assert child['sex'] - assert 'birthdate' in child + assert len(families) == 18 + for family in families.values(): + assert family['external_id'] + assert family['adults'] + assert family['login'] + assert family['password'] + assert family['zipcode'] + assert family['city'] + assert len(family['adults']) > 0 - assert 'invoices' in family - if family['invoices']: - for invoice in family['invoices']: - assert invoice['external_id'] - assert invoice['label'] - assert invoice['issue_date'] - assert invoice['online_payment'] - assert 'autobilling' in invoice - assert 'amount' in invoice - assert 'total_amount' in invoice + for adult in family['adults']: + assert adult['first_name'] + assert adult['last_name'] + assert adult['sex'] + assert adult['external_id'] + assert adult['zipcode'] + assert adult['city'] + assert len(family['children']) >= 1 + for child in family['children']: + assert child['external_id'] + assert child['first_name'] + assert child['last_name'] + assert child['sex'] + assert 'birthdate' in child - # there are 4 families with invoices in test data - assert len([f for f in families.values() if f['invoices']]) == 4 - # and 14 families with no invoices - assert len([f for f in families.values() if not f['invoices']]) == 14 - resource.save() + assert 'invoices' in family + if family['invoices']: + for invoice in family['invoices']: + assert invoice['external_id'] + assert invoice['label'] + assert invoice['issue_date'] + assert invoice['online_payment'] + assert 'autobilling' in invoice + assert 'amount' in invoice + assert 'total_amount' in invoice + + # there are 4 families with invoices in test data + assert len([f for f in families.values() if f['invoices']]) == 4 + # and 14 families with no invoices + assert len([f for f in families.values() if not f['invoices']]) == 14 + resource.save() assert Family.objects.filter(resource=resource).count() == 18 assert Adult.objects.all().count() == 31 @@ -510,12 +514,13 @@ def test_family_pending_invoices_by_nameid(): def test_incorrect_orleans_data(caplog): filepath = os.path.join(os.path.dirname(__file__), 'data', 'family_incorrect_data_orleans.zip') - GenericFamily.objects.create( - title='test orleans', - slug='test-orleans', - archive=File(open(filepath, 'rb'), 'family_incorrect_data_orleans.zip'), - file_format='concerto_orleans', - ) + with open(filepath, 'rb') as fd: + GenericFamily.objects.create( + title='test orleans', + slug='test-orleans', + archive=File(fd, 'family_incorrect_data_orleans.zip'), + file_format='concerto_orleans', + ) for record in caplog.records: assert 'Error occured while importing data:' in record.message assert record.name == 'passerelle.resource.family.test-orleans' diff --git a/tests/test_generic_endpoint.py b/tests/test_generic_endpoint.py index 4870787f..a4f01c25 100644 --- a/tests/test_generic_endpoint.py +++ b/tests/test_generic_endpoint.py @@ -56,7 +56,8 @@ DEMAND_STATUS = {'closed': True, 'status': 'accepted', 'comment': 'dossier trait @mock.patch('passerelle.apps.mdel.models.Demand.create_zip', lambda x, y: '1-14-ILE-LA') def test_generic_payload_logging(caplog, app, mdel): filename = os.path.join(os.path.dirname(__file__), 'data', 'mdel', 'formdata.json') - payload = json.load(open(filename)) + with open(filename) as fd: + payload = json.load(fd) resp = app.post_json('/mdel/test/create', params=payload, status=200) assert resp.json['data']['demand_id'] == '1-14-ILE-LA' @@ -82,7 +83,8 @@ def test_generic_payload_logging(caplog, app, mdel): @mock.patch('passerelle.utils.Request.get') def test_proxy_logger(mocked_get, caplog, app, arcgis): - payload = open(os.path.join(os.path.dirname(__file__), 'data', 'nancy_arcgis', 'sigresponse.json')).read() + with open(os.path.join(os.path.dirname(__file__), 'data', 'nancy_arcgis', 'sigresponse.json')) as fd: + payload = fd.read() mocked_get.return_value = tests.utils.FakedResponse(content=payload, status_code=200) # simple logger @@ -160,7 +162,8 @@ def test_proxy_logger(mocked_get, caplog, app, arcgis): @mock.patch('requests.Session.send') def test_proxy_logger_transaction_id(mocked_send, app, arcgis): - payload = open(os.path.join(os.path.dirname(__file__), 'data', 'nancy_arcgis', 'sigresponse.json')).read() + with open(os.path.join(os.path.dirname(__file__), 'data', 'nancy_arcgis', 'sigresponse.json')) as fd: + payload = fd.read() mocked_send.return_value = tests.utils.FakedResponse(content=payload, status_code=200) arcgis.log_evel = 'DEBUG' arcgis.base_url = 'https://example.net/' @@ -936,7 +939,8 @@ def test_generic_up_in_endpoints_infos(db, app, connector_class, expected): def test_generic_endpoint_superuser_access(db, app, admin_user, simple_user): MDEL.objects.create(slug='test') filename = os.path.join(os.path.dirname(__file__), 'data', 'mdel', 'formdata.json') - payload = json.load(open(filename)) + with open(filename) as fd: + payload = json.load(fd) app = login(app, username='user', password='user') resp = app.post_json('/mdel/test/create', params=payload, status=403) diff --git a/tests/test_grenoble_gru.py b/tests/test_grenoble_gru.py index 1ca281b5..29753403 100644 --- a/tests/test_grenoble_gru.py +++ b/tests/test_grenoble_gru.py @@ -127,7 +127,8 @@ def test_contact_mode_typologies_list_with_invalid_xml(app, setup): with mock.patch('passerelle.utils.Request.post') as request_post: response = mock.Mock() types_filename = os.path.join(os.path.dirname(__file__), 'data', 'grenoble_gru_typologies.xml') - types = open(types_filename).read() + with open(types_filename) as fd: + types = fd.read() response.content = types.replace('Courrier', 'Courrier & autres') request_post.return_value = response endpoint = reverse( @@ -145,7 +146,8 @@ def test_contact_mode_typologies_list(app, setup): with mock.patch('passerelle.utils.Request.post') as request_post: response = mock.Mock() types_filename = os.path.join(os.path.dirname(__file__), 'data', 'grenoble_gru_typologies.xml') - response.content = open(types_filename).read() + with open(types_filename) as fd: + response.content = fd.read() request_post.return_value = response endpoint = reverse( 'generic-endpoint', @@ -171,7 +173,8 @@ def test_contact_mode_typologies_list(app, setup): def get_typo_response(): types_filename = os.path.join(os.path.dirname(__file__), 'data', 'grenoble_gru_typologies.xml') - types = open(types_filename).read() + with open(types_filename) as fd: + types = fd.read() typo_response = mock.Mock() typo_response.content = types return typo_response @@ -357,9 +360,8 @@ def test_get_pavs(app, setup): with mock.patch('passerelle.utils.Request.post') as request_post: response = mock.Mock() json_response = mock.Mock() - json_response.return_value = json.load( - open(os.path.join(os.path.dirname(__file__), 'data', 'grenoble_gru_pavs.json')) - ) + with open(os.path.join(os.path.dirname(__file__), 'data', 'grenoble_gru_pavs.json')) as fd: + json_response.return_value = json.load(fd) response.json = json_response request_post.return_value = response response = app.get( diff --git a/tests/test_import_export.py b/tests/test_import_export.py index ddbbacfe..6347d612 100644 --- a/tests/test_import_export.py +++ b/tests/test_import_export.py @@ -200,7 +200,8 @@ def test_export_to_file(app, setup, filetype): assert Bdp.objects.count() == 1 bdp.delete() assert Bdp.objects.count() == 0 - import_site(json.load(open(f.name)), overwrite=True) + with open(f.name) as fd: + import_site(json.load(fd), overwrite=True) assert Bdp.objects.count() == 1 diff --git a/tests/test_iparapheur.py b/tests/test_iparapheur.py index f10ac6e4..cf624f05 100644 --- a/tests/test_iparapheur.py +++ b/tests/test_iparapheur.py @@ -73,7 +73,8 @@ def iph_mocked_get(url, params=None, **kwargs): else: raise Exception('my fault error, url is not yet mocked: %s' % url) - response._content = open(target_file, 'rb').read() + with open(target_file, 'rb') as fd: + response._content = fd.read() response.status_code = 200 return response @@ -346,9 +347,8 @@ def test_get_file(mocked_post, mocked_get, app, conn): response = Response() response.status_code = 200 - soap_response = open( - os.path.join(os.path.dirname(__file__), 'data/iparapheur_get_file_response.xml'), 'rb' - ).read() + with open(os.path.join(os.path.dirname(__file__), 'data/iparapheur_get_file_response.xml'), 'rb') as fd: + soap_response = fd.read() response._content = force_bytes(soap_response) mocked_post.return_value = response @@ -398,9 +398,8 @@ def test_get_file_invalid_appendix(mocked_post, mocked_get, app, conn): response = Response() response.status_code = 200 - soap_response = open( - os.path.join(os.path.dirname(__file__), 'data/iparapheur_get_file_response.xml'), 'rb' - ).read() + with open(os.path.join(os.path.dirname(__file__), 'data/iparapheur_get_file_response.xml'), 'rb') as fd: + soap_response = fd.read() response._content = soap_response mocked_post.return_value = response @@ -421,9 +420,8 @@ def test_get_file_not_found_appendix(mocked_post, mocked_get, app, conn): response = Response() response.status_code = 200 - soap_response = open( - os.path.join(os.path.dirname(__file__), 'data/iparapheur_get_file_response.xml'), 'rb' - ).read() + with open(os.path.join(os.path.dirname(__file__), 'data/iparapheur_get_file_response.xml'), 'rb') as fd: + soap_response = fd.read() response._content = soap_response mocked_post.return_value = response @@ -444,9 +442,8 @@ def test_get_file_appendix(mocked_post, mocked_get, app, conn): response = Response() response.status_code = 200 - soap_response = open( - os.path.join(os.path.dirname(__file__), 'data/iparapheur_get_file_response.xml'), 'rb' - ).read() + with open(os.path.join(os.path.dirname(__file__), 'data/iparapheur_get_file_response.xml'), 'rb') as fd: + soap_response = fd.read() response._content = soap_response mocked_post.return_value = response @@ -521,7 +518,8 @@ def test_call_wsdl(mocked_get, app, conn): ) resp = app.get(url) assert resp.headers['content-type'] == 'text/xml' - assert resp.content == open(wsdl_file(), 'rb').read() + with open(wsdl_file(), 'rb') as fd: + assert resp.content == fd.read() @mock.patch('passerelle.utils.Request.get', side_effect=ConnectionError('mocked error')) @@ -548,12 +546,14 @@ def test_no_auth_on_wsdl_imports(mocked_post, mocked_load, mocked_get, app, conn """ response_xmlmime, response_post = Response(), Response() response_xmlmime.status_code, response_post.status_code = 200, 200 - response_xmlmime._content = open(xmlmime(), 'rb').read() + with open(xmlmime(), 'rb') as fd: + response_xmlmime._content = fd.read() response_post._content = force_bytes( """[publik_test] m'a dit: "ping"! """ ) - mocked_load.return_value = open(wsdl_file(), 'rb').read() + with open(wsdl_file(), 'rb') as fd: + mocked_load.return_value = fd.read() mocked_get.return_value = response_xmlmime mocked_post.return_value = response_post url = reverse( diff --git a/tests/test_mdel.py b/tests/test_mdel.py index cdcbab11..860b2959 100644 --- a/tests/test_mdel.py +++ b/tests/test_mdel.py @@ -57,8 +57,8 @@ def validate_schema(doc, xsd): def check_zip_file(zipdir, expected_files): # check files order in zip - zipres = zipfile.ZipFile(zipdir, 'r') - files = [f.filename for f in zipres.infolist()] + with zipfile.ZipFile(zipdir, 'r') as zipres: + files = [f.filename for f in zipres.infolist()] assert sorted(files[: len(expected_files)]) == sorted(expected_files) diff --git a/tests/test_utils_conversion.py b/tests/test_utils_conversion.py index 249d8cf9..42b58004 100644 --- a/tests/test_utils_conversion.py +++ b/tests/test_utils_conversion.py @@ -4,9 +4,12 @@ from passerelle.utils.conversion import to_pdf def test_pdf_to_pdf_do_nothing(): - pdf = open(os.path.join(os.path.dirname(__file__), 'data', 'minimal.pdf'), 'rb').read() + with open(os.path.join(os.path.dirname(__file__), 'data', 'minimal.pdf'), 'rb') as fd: + pdf = fd.read() assert to_pdf(pdf) == pdf - pdf = open(os.path.join(os.path.dirname(__file__), 'data', 'minimal_bom.pdf'), 'rb').read() + with open(os.path.join(os.path.dirname(__file__), 'data', 'minimal_bom.pdf'), 'rb') as fd: + pdf = fd.read() assert to_pdf(pdf) == pdf - pdf = open(os.path.join(os.path.dirname(__file__), 'data', 'minimal_bomutf8.pdf'), 'rb').read() + with open(os.path.join(os.path.dirname(__file__), 'data', 'minimal_bomutf8.pdf'), 'rb') as fd: + pdf = fd.read() assert to_pdf(pdf) == pdf diff --git a/tests/test_utils_zip.py b/tests/test_utils_zip.py index 5176541d..cc37ef6c 100644 --- a/tests/test_utils_zip.py +++ b/tests/test_utils_zip.py @@ -146,20 +146,18 @@ def test_with_parts(tpl_builder, dest): with full_path.open('rb') as fd: with zipfile.ZipFile(fd) as zi: assert zi.namelist() == ['coucou-10-part1.xml', 'coucou-10-dôc.xml'] - assert ( - zi.open('coucou-10-part1.xml').read().decode('utf-8') - == 'blabla' - ) - assert zi.open('coucou-10-dôc.xml').read().decode('utf-8') == 'Héllo World!' + with zi.open('coucou-10-part1.xml') as zfd: + assert zfd.read().decode('utf-8') == 'blabla' + with zi.open('coucou-10-dôc.xml') as zfd: + assert zfd.read().decode('utf-8') == 'Héllo World!' with io.BytesIO(z.render_to_bytes()) as fd: with zipfile.ZipFile(fd) as zi: assert zi.namelist() == ['coucou-10-part1.xml', 'coucou-10-dôc.xml'] - assert ( - zi.open('coucou-10-part1.xml').read().decode('utf-8') - == 'blabla' - ) - assert zi.open('coucou-10-dôc.xml').read().decode('utf-8') == 'Héllo World!' + with zi.open('coucou-10-part1.xml') as zfd: + assert zfd.read().decode('utf-8') == 'blabla' + with zi.open('coucou-10-dôc.xml') as zfd: + assert zfd.read().decode('utf-8') == 'Héllo World!' def test_xml_error(tpl_builder, dest):