misc: remove unused-variable pylint error (#62099)

This commit is contained in:
Lauréline Guérin 2022-03-17 16:47:33 +01:00
parent c1074b68fc
commit 35512c9c2f
No known key found for this signature in database
GPG Key ID: 1FAB9B9B4F93D473
63 changed files with 139 additions and 159 deletions

View File

@ -263,7 +263,7 @@ class SqlFormatter(string.Formatter):
def validate_where(format_string):
formatter = SqlFormatter()
for prefix, ref, format_spec, conversion in formatter.parse(format_string):
for dummy, ref, dummy, dummy in formatter.parse(format_string):
if ref is None:
pass
elif ref == '':

View File

@ -732,7 +732,7 @@ class AstreGS(BaseResource):
contact = self.call('Contact', 'Creation', Contact=post_data)
# address should be set separatedly
post_data['EncodeKeyContact'] = contact.idContact
address = self.call('ContactAdresses', 'Creation', ContactAdresses=post_data)
self.call('ContactAdresses', 'Creation', ContactAdresses=post_data)
return {'data': serialize_object(contact)}
@endpoint(

View File

@ -519,7 +519,7 @@ class Resource(BaseResource, HTTPResource):
},
)
def link_by_id_per(self, request, NameID, id_per):
dossier = self.call_select_usager(id_per)
self.call_select_usager(id_per)
link, created = Link.objects.get_or_create(resource=self, name_id=NameID, id_per=id_per)
return {'link_id': link.pk, 'new': created}

View File

@ -447,7 +447,7 @@ class BaseAdresse(AddressResource):
department
)
)
except RequestException as e:
except RequestException:
continue
if ban_gz.status_code != 200:
continue

View File

@ -10,7 +10,7 @@ def migrate_subscribers(apps, schema_editor):
CartaDSSubscriber = apps.get_model('cartads_cs', 'CartaDSSubscriber')
for instance in CartaDSDossier.objects.all():
if instance.name_id:
subscriber, created = CartaDSSubscriber.objects.get_or_create(name_id=instance.name_id)
subscriber, _ = CartaDSSubscriber.objects.get_or_create(name_id=instance.name_id)
instance.subscribers.add(subscriber)

View File

@ -198,7 +198,7 @@ class AbstractCartaDSCS(BaseResource):
# communes
resp = client.service.GetCommunes(self.get_token(), {})
communes_cache, created = CartaDSDataCache.objects.get_or_create(data_type='communes')
communes_cache, dummy = CartaDSDataCache.objects.get_or_create(data_type='communes')
communes_cache.data_values = {'data': [{'id': str(x['Key']), 'text': x['Value']} for x in resp]}
communes_cache.save()
@ -208,7 +208,7 @@ class AbstractCartaDSCS(BaseResource):
resp = client.service.GetTypesDossier(self.get_token(), int(commune['id']), {})
if resp is None:
continue
data_cache, created = CartaDSDataCache.objects.get_or_create(
data_cache, dummy = CartaDSDataCache.objects.get_or_create(
data_type='types_dossier', data_parameters={'commune_id': int(commune['id'])}
)
data_cache.data_values = {'data': [{'id': str(x['Key']), 'text': x['Value']} for x in resp]}
@ -222,7 +222,7 @@ class AbstractCartaDSCS(BaseResource):
resp = client.service.GetObjetsDemande(self.get_token(), type_dossier_id)
if resp is None:
continue
data_cache, created = CartaDSDataCache.objects.get_or_create(
data_cache, dummy = CartaDSDataCache.objects.get_or_create(
data_type='objets_demande', data_parameters={'type_dossier_id': type_dossier_id}
)
data_cache.data_values = {'data': [{'id': str(x['Key']), 'text': x['Value']} for x in resp]}
@ -252,7 +252,7 @@ class AbstractCartaDSCS(BaseResource):
pass
return u'%(Nom)s' % x
data_cache, created = CartaDSDataCache.objects.get_or_create(
data_cache, dummy = CartaDSDataCache.objects.get_or_create(
data_type='liste_pdf',
data_parameters={
'type_dossier_id': type_dossier_id,
@ -284,7 +284,7 @@ class AbstractCartaDSCS(BaseResource):
resp = client.service.GetPieces(self.get_token(), type_dossier_id, objet_demande_id)
if resp is None:
continue
data_cache, created = CartaDSDataCache.objects.get_or_create(
data_cache, dummy = CartaDSDataCache.objects.get_or_create(
data_type='pieces',
data_parameters={
'type_dossier_id': type_dossier_id,
@ -459,7 +459,7 @@ class AbstractCartaDSCS(BaseResource):
},
)
def pieces(self, request, type_dossier_id, objet_demande_id, tracking_code, demolitions=True):
cache, created = CartaDSDataCache.objects.get_or_create(
cache, dummy = CartaDSDataCache.objects.get_or_create(
data_type='pieces',
data_parameters={
'type_dossier_id': type_dossier_id,
@ -740,7 +740,6 @@ class AbstractCartaDSCS(BaseResource):
# this cannot be verb DELETE as we have no way to set
# Access-Control-Allow-Methods
signer = Signer(salt='cart@ds_cs')
tracking_code = signer.unsign(token)
CartaDSFile.objects.filter(id=signer.unsign(file_upload)).delete()
return {'err': 0}
@ -1058,7 +1057,6 @@ class AbstractCartaDSCS(BaseResource):
piece.save()
def get_file_status(self, dossier):
extra = None
response = {}
if dossier.deleted:
status_id = 'deleted'

View File

@ -86,7 +86,7 @@ class ChoositSMSGateway(SMSResource):
else:
try:
output = r.json()
except ValueError as e:
except ValueError:
results.append('Choosit error: bad JSON response')
else:
if not isinstance(output, dict):

View File

@ -182,10 +182,6 @@ class CsvDataSource(BaseResource):
return result
def cache_data(self):
# FIXME: why are those dead variables computed ?
titles = [t.strip() for t in self.columns_keynames.split(',')]
indexes = [titles.index(t) for t in titles if t]
caption = [titles[i] for i in indexes]
with transaction.atomic():
TableRow.objects.filter(resource=self).delete()
for block in batch(enumerate(self.get_rows()), 5000):
@ -274,7 +270,7 @@ class CsvDataSource(BaseResource):
rows = [[smart_text(x) for x in y] for y in rows if y]
titles = [t.strip() for t in self.columns_keynames.split(',')]
indexes = [titles.index(t) for t in titles if t]
caption = [titles[i] for i in indexes]
captions = [titles[i] for i in indexes]
def get_cell(row, index):
try:
@ -282,7 +278,7 @@ class CsvDataSource(BaseResource):
except IndexError:
return ''
return [{caption: get_cell(row, index) for caption, index in zip(caption, indexes)} for row in rows]
return [{caption: get_cell(row, index) for caption, index in zip(captions, indexes)} for row in rows]
def get_cached_rows(self, initial=True):
found = False

View File

@ -211,7 +211,7 @@ class ESirius(BaseResource, HTTPResource):
post_data['user']['address'] = {}
post_data['codeRDV'] = id
response = self.request('appointments', method='PUT', json=post_data)
self.request('appointments', method='PUT', json=post_data)
return {'data': {'id': id, 'updated': True}}
@endpoint(
@ -245,5 +245,5 @@ class ESirius(BaseResource, HTTPResource):
},
)
def delete_appointment(self, request, id):
response = self.request('appointments/%s/' % id, method='DELETE')
self.request('appointments/%s/' % id, method='DELETE')
return {'data': {'id': id, 'deleted': True}}

View File

@ -74,7 +74,7 @@ class Loader(object):
invoice['online_payment'] = False
invoice['no_online_payment_reason'] = 'autobilling'
obj, created = Invoice.objects.update_or_create(
obj, dummy = Invoice.objects.update_or_create(
resource=self.connector, external_id=row['ID_FAC'], defaults=invoice
)
invoice_filename = '%s_%s.pdf' % (

View File

@ -177,7 +177,7 @@ class Loader(object):
'city',
),
)
family, created = Family.objects.update_or_create(
family, dummy = Family.objects.update_or_create(
external_id=family_data['external_id'], resource=self.connector, defaults=data
)
@ -198,7 +198,7 @@ class Loader(object):
invoice_path = os.path.join(invoices_dir, invoice_filename)
# create invoice object only if associated pdf exists
if os.path.exists(invoice_path):
invoice, created = Invoice.objects.update_or_create(
invoice, dummy = Invoice.objects.update_or_create(
resource=self.connector,
family=family,
external_id=invoice_data['external_id'],

View File

@ -49,7 +49,7 @@ class Loader:
'no_online_payment_reason': None,
'label': external_id,
}
obj, created = Invoice.objects.update_or_create(
Invoice.objects.update_or_create(
resource=self.connector, external_id=external_id, defaults=invoice
)
external_ids.append(external_id)

View File

@ -75,7 +75,7 @@ class Loader(object):
invoice['online_payment'] = False
invoice['no_online_payment_reason'] = 'autobilling'
obj, created = Invoice.objects.update_or_create(
obj, dummy = Invoice.objects.update_or_create(
resource=self.connector, external_id=row['ID_FAC'], defaults=invoice
)
invoice_filename = '%s_%s.pdf' % (

View File

@ -228,7 +228,7 @@ class GenericFamily(BaseResource):
('complement', 'address_complement'),
),
)
family, created = Family.objects.update_or_create(
family, dummy = Family.objects.update_or_create(
external_id=family_data['id'], resource=self, defaults=data
)
for adult in family_data.get('adults') or []:

View File

@ -103,7 +103,7 @@ class FranceConnect:
self.add('fc_token_endpoint_response', response_content)
self.add('fc_access_token', response_content['access_token'])
self.add('fc_id_token', response_content['id_token'])
header, payload, signature = self.fc_id_token.split('.')
dummy, payload, dummy = self.fc_id_token.split('.')
self.add('fc_id_token_payload', json.loads(base64url_decode(payload.encode())))
except Exception as e:
raise FranceConnectError('Error in token endpoint response', sub_exception=repr(e))

View File

@ -276,7 +276,7 @@ class Gesbac(BaseResource):
with transaction.atomic():
form = Form.objects.create(resource=self, form_id=form_id, counter=counter)
break
except IntegrityError as e:
except IntegrityError:
continue
else:
raise APIError('fail: more than 20 demands')

View File

@ -66,7 +66,7 @@ def zipdir(path):
"""Zip directory"""
archname = path + '.zip'
with zipfile.ZipFile(archname, 'w', zipfile.ZIP_DEFLATED) as zipf:
for root, dirs, files in os.walk(path):
for root, dummy, files in os.walk(path):
for f in files:
fpath = os.path.join(root, f)
zipf.write(fpath, os.path.basename(fpath))

View File

@ -107,5 +107,5 @@ class OrangeSMSGateway(SMSResource):
access_token = self.get_access_token()
group_id = self.group_id_from_name(access_token)
response = self.diffusion(access_token, group_id, destinations, text, sender)
self.diffusion(access_token, group_id, destinations, text, sender)
return None # credit consumed is unknown

View File

@ -161,7 +161,7 @@ class OVHSMSGateway(SMSResource):
else:
try:
result = response.json()
except ValueError as e:
except ValueError:
raise APIError('OVH error: bad JSON response')
try:
response.raise_for_status()
@ -257,7 +257,7 @@ class OVHSMSGateway(SMSResource):
else:
try:
result = response.json()
except ValueError as e:
except ValueError:
raise APIError('OVH error: bad JSON response')
else:
if not isinstance(result, dict):

View File

@ -33,12 +33,12 @@ class RequestTokenView(RedirectView):
return connector.get_absolute_url()
try:
result = resp.json()
except ValueError as e:
except ValueError:
messages.error(self.request, _('There has been an error requesting token: bad JSON response.'))
return connector.get_absolute_url()
try:
resp.raise_for_status()
except RequestException as e:
except RequestException:
error_text = result.get('message', result)
messages.error(self.request, _('There has been an error requesting token: %s.') % error_text)
return connector.get_absolute_url()

View File

@ -150,7 +150,7 @@ class PloneRestApi(BaseResource, HTTPResource):
return token
def request(self, path='', method='GET', params=None, json=None):
scheme, netloc, base_path, query, fragment = urlsplit(self.service_url)
scheme, netloc, base_path, dummy, fragment = urlsplit(self.service_url)
url = urlunsplit((scheme, netloc, base_path + '/%s' % path, '', fragment))
headers = {'Accept': 'application/json'}
kwargs = {'method': method, 'url': url, 'headers': headers, 'params': params, 'json': json}
@ -164,7 +164,7 @@ class PloneRestApi(BaseResource, HTTPResource):
if response.status_code != 204: # No Content
try:
json_response = response.json()
except ValueError as e:
except ValueError:
raise APIError('PloneRestApi: bad JSON response')
try:
response.raise_for_status()

View File

@ -289,7 +289,7 @@ class Solis(BaseResource):
token = self.apa_token(user_id, code) # invalid credentials raise APIError here
information = self.apa_get_information(information='exportDonneesIndividu', token=token)
text = get_template(self.text_template_name).render(information).strip()
link, created = SolisAPALink.objects.update_or_create(
dummy, created = SolisAPALink.objects.update_or_create(
resource=self, name_id=name_id, user_id=user_id, defaults={'code': code, 'text': text}
)
return {'data': {'user_id': user_id, 'created': created, 'updated': not created}}
@ -580,7 +580,7 @@ class Solis(BaseResource):
self.rsa_token(user_id, code, dob) # invalid credentials raise APIError here
information = self.rsa_get_information('individu', user_id, code, dob)
text = get_template(self.text_template_name_rsa).render(information).strip()
link, created = SolisRSALink.objects.update_or_create(
dummy, created = SolisRSALink.objects.update_or_create(
resource=self, name_id=name_id, user_id=user_id, defaults={'code': code, 'dob': dob, 'text': text}
)
return {'data': {'user_id': user_id, 'text': text, 'created': created, 'updated': not created}}

View File

@ -562,7 +562,7 @@ class Mapping(models.Model):
def variables(self):
yield 'insee_code'
yield 'email'
for path, xsd_type in self.xsd.paths():
for path, dummy in self.xsd.paths():
names = [simplify(tag.localname) for tag in path]
yield '_'.join(names)
if hasattr(self, 'variables_%s' % self.procedure):

View File

@ -252,7 +252,6 @@ class VivaTicket(BaseResource):
else:
internal_code = response.json()['InternalCode']
# update contact data
contact_data = response.json()
url = urlparse.urljoin(self.url, 'Contact/Put')
response = self.requests.put(
url,

View File

@ -13,7 +13,7 @@ def set_logging_parameters(apps, schema_editor):
continue
content_type = ContentType.objects.get_for_model(model)
for instance in model.objects.all():
parameters, created = LoggingParameters.objects.get_or_create(
parameters, _ = LoggingParameters.objects.get_or_create(
resource_type=content_type, resource_pk=instance.id
)
parameters.log_level = instance.log_level

View File

@ -279,7 +279,7 @@ class BaseResource(models.Model):
def get_endpoints_infos(self):
endpoints = []
for name, method in inspect.getmembers(self, predicate=inspect.ismethod):
for dummy, method in inspect.getmembers(self, predicate=inspect.ismethod):
if hasattr(method, 'endpoint_info'):
method.endpoint_info.object = self
endpoint_name = method.endpoint_info.name
@ -397,7 +397,7 @@ class BaseResource(models.Model):
app_label, model_name = d['resource_type'].split('.')
try:
model = apps.get_model(app_label, model_name)
except LookupError as e:
except LookupError:
raise BaseResource.UnknownBaseResourceError(app_label)
try:
instance = model.objects.get(slug=d['slug'])
@ -632,7 +632,7 @@ class BaseResource(models.Model):
def handle_job_error(self, job, exc_info):
from passerelle.utils.conversion import exception_to_text
(exc_type, exc_value, tb) = exc_info
(exc_type, exc_value, dummy) = exc_info
job.status = 'failed'
job.done_timestamp = timezone.now()
job.status_details = {
@ -1005,7 +1005,7 @@ class ProxyLogger(object):
attr['sourceip'] = sourceip
if kwargs.get('exc_info'):
(exc_type, exc_value, tb) = sys.exc_info()
(exc_type, exc_value, dummy) = sys.exc_info()
attr['extra']['error_summary'] = traceback.format_exception_only(exc_type, exc_value)
ResourceLog.objects.create(**attr)

View File

@ -39,7 +39,7 @@ class Adict(BaseResource):
},
)
def feature_info(self, request, lat, lon):
params = query_args = {'x': lon, 'y': lat, 'srid': '4326'}
query_args = {'x': lon, 'y': lat, 'srid': '4326'}
query_args['token'] = self.api_token
query_args['sector_type'] = self.sector_type
response = self.requests.get(

View File

@ -126,7 +126,7 @@ class GrandLyonStreetSections(BaseResource):
for value in json_loads(sections).get('values'):
if not value.get('codefuv') or not value.get('codetroncon'):
continue
section, created = StreetSection.objects.get_or_create(
section, dummy = StreetSection.objects.get_or_create(
codefuv=value.get('codefuv'), codetroncon=value.get('codetroncon')
)
for attribute in ('nom', 'nomcommune', 'domanialite', 'codeinsee'):

View File

@ -158,7 +158,7 @@ class Greco(BaseResource):
message.attach(part)
message._write_headers = lambda x: None
msg_x = message.as_string(unixfrom=False)
message.as_string(unixfrom=False)
# RFC 2045 defines MIME multipart boundaries:
# * boundary := 0*69<bchars> bcharsnospace
# * dash-boundary := "--" boundary

View File

@ -57,7 +57,7 @@ def build_message(data):
message['InformationsBancaires'] = get_info_bancaire(fields, wf)
message['Patrimoine'] = get_patrimoine(fields, wf)
etablissement, etablissement_date_entree = get_etablissement(fields, wf)
etablissement, dummy = get_etablissement(fields, wf)
demande_apa = {
'statutDemande': statut_demande,

View File

@ -61,7 +61,7 @@ class SolisAPA(BaseResource):
try:
ret = response.json()
return ret
except (ValueError) as e:
except (ValueError):
raise APIError('Response content is not a valid JSON')
def get_resource_url(self, uri):
@ -197,7 +197,7 @@ class SolisAPA(BaseResource):
def get_lieux(self, q, commune, departement):
# si commune est un code solis de la forme commune-dep-com
if commune and commune.startswith('commune-'):
x, departement, commune = commune.split('-')
dummy, departement, commune = commune.split('-')
call = self._conciliation(
conciliation.CONCILIATION_ADRESSE, commune=commune, departement=departement, lieu='%%%s%%' % q
)

View File

@ -131,7 +131,7 @@ class Tcl(BaseResource):
response = self.requests.get(url)
response.raise_for_status()
for line_data in response.json()['values']:
line, created = Line.objects.get_or_create(
line, dummy = Line.objects.get_or_create(
code_titan=line_data['code_titan'],
defaults={'transport_key': key, 'ligne': line_data['ligne']},
)
@ -145,7 +145,7 @@ class Tcl(BaseResource):
for feature in response.json()['features']:
arret_data = feature['properties']
arret_data['id_data'] = arret_data.pop('id')
stop, created = Stop.objects.get_or_create(id_data=arret_data['id_data'])
stop, dummy = Stop.objects.get_or_create(id_data=arret_data['id_data'])
stop.__dict__.update(arret_data)
stop.pmr = bool(stop.pmr == 't')
stop.escalator = bool(stop.escalator == 't')

View File

@ -78,7 +78,7 @@ class ToulouseAxel(BaseResource):
def lock(self, request, key, locker):
if not key:
raise APIError('key is empty', err_code='bad-request', http_status=400)
lock, created = Lock.objects.get_or_create(resource=self, key=key, defaults={'locker': locker})
lock, dummy = Lock.objects.get_or_create(resource=self, key=key, defaults={'locker': locker})
return {'key': key, 'locked': True, 'locker': lock.locker, 'lock_date': lock.lock_date}
@endpoint(

View File

@ -475,7 +475,7 @@ class WcsRequestFile(models.Model):
files = {'media': (self.filename, self.content.open('rb'), self.content_type)}
try:
instance.request(url, method='PUT', files=files)
except APIError as e:
except APIError:
return False
self.content.delete()
return True
@ -491,12 +491,12 @@ class SmartRequest(models.Model):
result = JSONField(null=True)
def get_wcs_api(self, base_url):
scheme, netloc, path, params, query, fragment = urlparse.urlparse(base_url)
scheme, netloc, dummy, dummy, dummy, dummy = urlparse.urlparse(base_url)
services = settings.KNOWN_SERVICES.get('wcs', {})
service = None
for service in services.values():
remote_url = service.get('url')
r_scheme, r_netloc, r_path, r_params, r_query, r_fragment = urlparse.urlparse(remote_url)
r_scheme, r_netloc, dummy, dummy, dummy, dummy = urlparse.urlparse(remote_url)
if r_scheme == scheme and r_netloc == netloc:
break
else:

View File

@ -5,7 +5,7 @@ from django.utils.encoding import force_text
def client_to_jsondict(client):
"""return description of the client, as dict (for json export)"""
res = {}
for i, sd in enumerate(client.sd):
for sd in client.sd:
d = {}
d['tns'] = sd.wsdl.tns[1]
d['prefixes'] = dict(p for p in sd.prefixes)

View File

@ -427,7 +427,7 @@ class GenericEndpointView(GenericConnectorMixin, SingleObjectMixin, View):
self.init_stuff(request, *args, **kwargs)
self.connector = self.get_object()
self.endpoint = None
for name, method in inspect.getmembers(self.connector, inspect.ismethod):
for dummy, method in inspect.getmembers(self.connector, inspect.ismethod):
if not hasattr(method, 'endpoint_info'):
continue
if not method.endpoint_info.name == kwargs.get('endpoint'):

View File

@ -79,4 +79,4 @@ def test_airquality_details(app, airquality):
def test_airquality_details_unknown_city(app, airquality):
endpoint = tests.utils.generic_endpoint_url('airquality', 'details', slug=airquality.slug)
resp = app.get(endpoint + '/fr/paris/', status=404)
app.get(endpoint + '/fr/paris/', status=404)

View File

@ -478,17 +478,14 @@ def test_document_association(app, resource, mock_api_entreprise, freezer):
assert len(data) == 3
document = data[0]
assert 'url' in document
resp = app.get(
document['url'], params={'context': 'MSP', 'object': 'demand', 'recipient': 'siret'}, status=200
)
app.get(document['url'], params={'context': 'MSP', 'object': 'demand', 'recipient': 'siret'}, status=200)
# try to get document with wrong signature
url = document['url']
wrong_url = document['url'] + "wrong/"
resp = app.get(wrong_url, status=404)
app.get(wrong_url, status=404)
# try expired url
freezer.move_to(timezone.now() + timezone.timedelta(days=8))
resp = app.get(document['url'], status=404)
app.get(document['url'], status=404)
def test_effectifs_annuels_acoss_covid(app, resource, mock_api_entreprise, freezer):

View File

@ -375,7 +375,7 @@ def test_detail_page(app, resource, admin_user):
)
def test_api_particulier_dont_log_not_found(app, resource, mock, should_log):
with HTTMock(mock):
resp = endpoint_get(
endpoint_get(
'/api-particulier/test/avis-imposition',
app,
resource,

View File

@ -81,7 +81,7 @@ def test_check_status(mocked_get, connector):
del hello_response['Data']
mocked_get.return_value = tests.utils.FakedResponse(content=json.dumps(hello_response), status_code=200)
with pytest.raises(Exception) as error:
resp = connector.check_status()
connector.check_status()
assert str(error.value) == 'Invalid credentials'
@ -131,7 +131,6 @@ def test_get_user_forms(mocked_post, mocked_get, app, connector):
mocked_post.return_value = tests.utils.FakedResponse(content=FAKE_LOGIN_OIDC_RESPONSE, status_code=200)
mocked_get.return_value = tests.utils.FakedResponse(content=FAKE_USER_DEMANDS_RESPONSE, status_code=200)
resp = app.get(endpoint)
result = resp.json
assert resp.json['data']
for item in resp.json['data']:
assert item['status'] == 'Deposee'

View File

@ -40,7 +40,7 @@ def test_authentication(app, connector):
status=200,
)
resp = app.get('/astre-rest/slug-astre-rest/gf-documents-entites-getref?entity_type=who&entity_code=what')
app.get('/astre-rest/slug-astre-rest/gf-documents-entites-getref?entity_type=who&entity_code=what')
assert len(responses.calls) == 1
assert responses.calls[0].request.headers['login'] == 'admin'
assert responses.calls[0].request.headers['database'] == 'db'
@ -48,7 +48,7 @@ def test_authentication(app, connector):
connector.auth = 'AD'
connector.save()
resp = app.get('/astre-rest/slug-astre-rest/gf-documents-entites-getref?entity_type=who&entity_code=what')
app.get('/astre-rest/slug-astre-rest/gf-documents-entites-getref?entity_type=who&entity_code=what')
assert len(responses.calls) == 2
assert responses.calls[1].request.headers['login'] == 'admin'
assert responses.calls[1].request.headers['database'] == 'db'

View File

@ -230,7 +230,7 @@ def test_base_adresse_search_qs_citycode(mocked_get, app, base_adresse):
@mock.patch('passerelle.utils.Request.get')
def test_base_adresse_search_qs_lat_lon(mocked_get, app, base_adresse):
resp = app.get('/base-adresse/%s/search?q=plop&lat=0&lon=1' % base_adresse.slug)
app.get('/base-adresse/%s/search?q=plop&lat=0&lon=1' % base_adresse.slug)
assert 'lat=0' in mocked_get.call_args[0][0]
assert 'lon=1' in mocked_get.call_args[0][0]
@ -590,7 +590,7 @@ def test_base_adresse_cities_dash_in_q(app, base_adresse, miquelon):
def test_base_adresse_cities_region_department(app, base_adresse, miquelon, city):
reg = RegionModel.objects.create(name=u'IdF', code='11', resource=base_adresse)
dep = DepartmentModel.objects.create(name=u'Paris', code='75', region=reg, resource=base_adresse)
paris = CityModel.objects.create(
CityModel.objects.create(
name=u'Paris',
code='75056',
zipcode='75014',
@ -670,7 +670,7 @@ def test_base_adresse_departments(app, base_adresse, department, region):
def test_base_adresse_departments_region(app, base_adresse, department):
reg = RegionModel.objects.create(name=u'IdF', code='11', resource=base_adresse)
paris = DepartmentModel.objects.create(name=u'Paris', code='75', region=reg, resource=base_adresse)
DepartmentModel.objects.create(name=u'Paris', code='75', region=reg, resource=base_adresse)
resp = app.get('/base-adresse/%s/departments?region_code=84' % base_adresse.slug)
result = resp.json['data']
@ -820,7 +820,7 @@ def test_base_adresse_command_update_geo_invalid(mocked_get, db, base_adresse):
@pytest.mark.usefixtures('mock_update_api_geo')
@mock.patch('passerelle.utils.Request.get', side_effect=ConnectionError)
def test_base_adresse_command_update_street_timeout(mocked_get, db, base_adresse):
resp = call_command('cron', 'daily')
call_command('cron', 'daily')
assert mocked_get.call_count == 1
assert not RegionModel.objects.exists()
@ -874,11 +874,11 @@ def test_base_adresse_addresses_qs_citycode(mocked_get, app, base_adresse):
@mock.patch('passerelle.utils.Request.get')
def test_base_adresse_addresses_qs_coordinates(mocked_get, app, base_adresse_coordinates):
resp = app.get('/base-adresse/%s/addresses?q=plop' % base_adresse_coordinates.slug)
app.get('/base-adresse/%s/addresses?q=plop' % base_adresse_coordinates.slug)
assert 'lat=%s' % base_adresse_coordinates.latitude in mocked_get.call_args[0][0]
assert 'lon=%s' % base_adresse_coordinates.longitude in mocked_get.call_args[0][0]
resp = app.get('/base-adresse/%s/addresses?q=plop&lat=42&lon=43' % base_adresse_coordinates.slug)
app.get('/base-adresse/%s/addresses?q=plop&lat=42&lon=43' % base_adresse_coordinates.slug)
assert 'lat=42' in mocked_get.call_args[0][0]
assert 'lon=43' in mocked_get.call_args[0][0]
@ -926,7 +926,7 @@ def test_base_adresse_addresses_cache_err(app, base_adresse, mock_api_adresse_da
@pytest.mark.usefixtures('mock_update_api_geo', 'mock_update_streets')
def test_base_adresse_addresses_clean_cache(app, base_adresse, freezer, mock_api_adresse_data_gouv_fr_search):
resp = app.get('/base-adresse/%s/addresses?q=plop' % base_adresse.slug)
app.get('/base-adresse/%s/addresses?q=plop' % base_adresse.slug)
assert AddressCacheModel.objects.count() == 1
freezer.move_to(datetime.timedelta(minutes=30))
@ -937,17 +937,17 @@ def test_base_adresse_addresses_clean_cache(app, base_adresse, freezer, mock_api
call_command('cron', 'hourly')
assert AddressCacheModel.objects.count() == 0
resp = app.get('/base-adresse/%s/addresses?q=plop' % base_adresse.slug)
app.get('/base-adresse/%s/addresses?q=plop' % base_adresse.slug)
assert AddressCacheModel.objects.count() == 1
# asking for the address again resets the timestamp
freezer.move_to(datetime.timedelta(hours=1, seconds=1))
resp = app.get('/base-adresse/%s/addresses?q=plop' % base_adresse.slug)
app.get('/base-adresse/%s/addresses?q=plop' % base_adresse.slug)
call_command('cron', 'hourly')
assert AddressCacheModel.objects.count() == 1
freezer.move_to(datetime.timedelta(hours=1, seconds=1))
resp = app.get(
app.get(
'/base-adresse/%s/addresses?id=%s'
% (base_adresse.slug, '49007_6950_be54bd~47.474633~-0.593775~Rue%20Roger%20Halope%2049000%20Angers')
)

View File

@ -369,7 +369,7 @@ def test_send(connector, app, cached_data):
# test_pack
with mock.patch('passerelle.apps.cartads_cs.models.CartaDSCS.soap_client') as client:
client.return_value = mock.Mock(service=FakeService())
with mock.patch('passerelle.apps.cartads_cs.models.FTP') as FTP:
with mock.patch('passerelle.apps.cartads_cs.models.FTP'):
connector.jobs()
assert Job.objects.filter(method_name='pack', status='completed').count()
assert Job.objects.filter(method_name='send_to_cartads', status='completed').count()
@ -416,7 +416,7 @@ def test_send_notification_error(connector, app, cached_data):
# test_pack
with mock.patch('passerelle.apps.cartads_cs.models.CartaDSCS.soap_client') as client:
client.return_value = mock.Mock(service=FakeService())
with mock.patch('passerelle.apps.cartads_cs.models.FTP') as FTP:
with mock.patch('passerelle.apps.cartads_cs.models.FTP'):
connector.jobs()
assert Job.objects.filter(method_name='pack', status='completed').count()
assert Job.objects.filter(method_name='send_to_cartads', status='completed').count()
@ -619,7 +619,6 @@ def test_daact_pieces_management(connector, app, cached_data):
def test_list_of_files(connector, app, cached_data):
CartaDSDossier.objects.all().delete()
test_send(connector, app, cached_data)
dossier = CartaDSDossier.objects.all()[0]
with mock.patch('passerelle.apps.cartads_cs.models.CartaDSCS.soap_client') as client:
client.return_value = mock.Mock(service=FakeService())

View File

@ -14,7 +14,7 @@ def test_cron_frequencies(db):
def test_cron_error(db, caplog):
connector = BaseAdresse.objects.create(slug='base-adresse')
BaseAdresse.objects.create(slug='base-adresse')
excep = Exception('hello')
with mock.patch(
'passerelle.apps.base_adresse.models.AddressResource.hourly', new=mock.Mock(side_effect=excep)

View File

@ -152,7 +152,7 @@ def test_default_column_keynames(setup, filetype):
def test_sheet_name_error(setup, app, filetype, admin_user):
csvdata, url = setup('field,,another_field,', filename=filetype, data=get_file_content(filetype))
setup('field,,another_field,', filename=filetype, data=get_file_content(filetype))
app = login(app)
resp = app.get('/manage/csvdatasource/test/edit')
edit_form = resp.forms[0]
@ -167,7 +167,7 @@ def test_sheet_name_error(setup, app, filetype, admin_user):
def test_unfiltered_data(client, setup, filetype):
csvdata, url = setup('field,,another_field,', filename=filetype, data=get_file_content(filetype))
_, url = setup('field,,another_field,', filename=filetype, data=get_file_content(filetype))
resp = client.get(url)
result = parse_response(resp)
for item in result:
@ -176,7 +176,7 @@ def test_unfiltered_data(client, setup, filetype):
def test_empty_file(client, setup):
csvdata, url = setup(
_, url = setup(
'field,,another_field,', filename='data-empty.ods', data=get_file_content('data-empty.ods')
)
resp = client.get(url)
@ -185,14 +185,14 @@ def test_empty_file(client, setup):
def test_view_manage_page(setup, app, filetype, admin_user):
csvdata, url = setup(',id,,text,', filename=filetype, data=get_file_content(filetype))
csvdata, _ = setup(',id,,text,', filename=filetype, data=get_file_content(filetype))
app = login(app)
app.get(csvdata.get_absolute_url())
def test_good_filter_data(client, setup, filetype):
filter_criteria = 'Zakia'
csvdata, url = setup(',id,,text,', filename=filetype, data=get_file_content(filetype))
_, url = setup(',id,,text,', filename=filetype, data=get_file_content(filetype))
filters = {'text': filter_criteria}
resp = client.get(url, filters)
result = parse_response(resp)
@ -205,7 +205,7 @@ def test_good_filter_data(client, setup, filetype):
def test_bad_filter_data(client, setup, filetype):
filter_criteria = 'bad'
csvdata, url = setup(',id,,text,', filename=filetype, data=get_file_content(filetype))
_, url = setup(',id,,text,', filename=filetype, data=get_file_content(filetype))
filters = {'text': filter_criteria}
resp = client.get(url, filters)
result = parse_response(resp)
@ -213,7 +213,7 @@ def test_bad_filter_data(client, setup, filetype):
def test_useless_filter_data(client, setup, filetype):
csvdata, url = setup('id,,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype))
_, url = setup('id,,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype))
filters = {'text': 'Ali'}
resp = client.get(url, filters)
result = parse_response(resp)
@ -221,7 +221,7 @@ def test_useless_filter_data(client, setup, filetype):
def test_columns_keynames_with_spaces(client, setup, filetype):
csvdata, url = setup('id , , nom,text , ', filename=filetype, data=get_file_content(filetype))
_, url = setup('id , , nom,text , ', filename=filetype, data=get_file_content(filetype))
filters = {'text': 'Yaniss'}
resp = client.get(url, filters)
result = parse_response(resp)
@ -229,7 +229,7 @@ def test_columns_keynames_with_spaces(client, setup, filetype):
def test_skipped_header_data(client, setup, filetype):
csvdata, url = setup(',id,,text,', filename=filetype, data=get_file_content(filetype), skip_header=True)
_, url = setup(',id,,text,', filename=filetype, data=get_file_content(filetype), skip_header=True)
filters = {'q': 'Eliot'}
resp = client.get(url, filters)
result = parse_response(resp)
@ -237,7 +237,7 @@ def test_skipped_header_data(client, setup, filetype):
def test_data(client, setup, filetype):
csvdata, url = setup('fam,id,, text,sexe ', filename=filetype, data=get_file_content(filetype))
_, url = setup('fam,id,, text,sexe ', filename=filetype, data=get_file_content(filetype))
filters = {'text': 'Sacha'}
resp = client.get(url, filters)
result = parse_response(resp)
@ -246,7 +246,7 @@ def test_data(client, setup, filetype):
def test_unicode_filter_data(client, setup, filetype):
filter_criteria = u'Benoît'
csvdata, url = setup(',id,,text,', filename=filetype, data=get_file_content(filetype))
_, url = setup(',id,,text,', filename=filetype, data=get_file_content(filetype))
filters = {'text': filter_criteria}
resp = client.get(url, filters)
result = parse_response(resp)
@ -258,7 +258,7 @@ def test_unicode_filter_data(client, setup, filetype):
def test_unicode_case_insensitive_filter_data(client, setup, filetype):
csvdata, url = setup(',id,,text,', filename=filetype, data=get_file_content(filetype))
_, url = setup(',id,,text,', filename=filetype, data=get_file_content(filetype))
filter_criteria = u'anaëlle'
filters = {'text': filter_criteria, 'case-insensitive': ''}
resp = client.get(url, filters)
@ -271,7 +271,7 @@ def test_unicode_case_insensitive_filter_data(client, setup, filetype):
def test_data_bom(client, setup):
csvdata, url = setup('fam,id,, text,sexe ', data=StringIO(data_bom))
_, url = setup('fam,id,, text,sexe ', data=StringIO(data_bom))
filters = {'text': 'Eliot'}
resp = client.get(url, filters)
result = parse_response(resp)
@ -279,7 +279,7 @@ def test_data_bom(client, setup):
def test_multi_filter(client, setup, filetype):
csvdata, url = setup('fam,id,, text,sexe ', filename=filetype, data=get_file_content(filetype))
_, url = setup('fam,id,, text,sexe ', filename=filetype, data=get_file_content(filetype))
filters = {'sexe': 'F'}
resp = client.get(url, filters)
result = parse_response(resp)
@ -288,7 +288,7 @@ def test_multi_filter(client, setup, filetype):
def test_query(client, setup, filetype):
csvdata, url = setup('fam,id,, text,sexe ', filename=filetype, data=get_file_content(filetype))
_, url = setup('fam,id,, text,sexe ', filename=filetype, data=get_file_content(filetype))
filters = {'q': 'liot'}
resp = client.get(url, filters)
result = parse_response(resp)
@ -297,7 +297,7 @@ def test_query(client, setup, filetype):
def test_query_insensitive_and_unicode(client, setup, filetype):
csvdata, url = setup('fam,id,, text,sexe ', filename=filetype, data=get_file_content(filetype))
_, url = setup('fam,id,, text,sexe ', filename=filetype, data=get_file_content(filetype))
filters = {'q': 'elIo', 'case-insensitive': ''}
resp = client.get(url, filters)
result = parse_response(resp)
@ -311,7 +311,7 @@ def test_query_insensitive_and_unicode(client, setup, filetype):
def test_query_insensitive_and_filter(client, setup, filetype):
csvdata, url = setup('fam,id,,text,sexe', filename=filetype, data=get_file_content(filetype))
_, url = setup('fam,id,,text,sexe', filename=filetype, data=get_file_content(filetype))
filters = {'q': 'elIo', 'sexe': 'H', 'case-insensitive': ''}
resp = client.get(url, filters)
result = parse_response(resp)
@ -342,7 +342,7 @@ def test_dialect(client, setup):
def test_on_the_fly_dialect_detection(client, setup):
# fake a connector that was not initialized during .save(), because it's
# been migrated and we didn't do dialect detection at save() time.
csvdata, url = setup(data=StringIO(data))
_, url = setup(data=StringIO(data))
CsvDataSource.objects.all().update(_dialect_options=None)
resp = client.get(url)
result = json_loads(resp.content)
@ -351,7 +351,7 @@ def test_on_the_fly_dialect_detection(client, setup):
def test_missing_columns(client, setup):
csvdata, url = setup(data=StringIO(data + 'A;B;C\n'))
_, url = setup(data=StringIO(data + 'A;B;C\n'))
resp = client.get(url)
result = json_loads(resp.content)
assert result['err'] == 0
@ -747,7 +747,7 @@ def test_edit_connector_queries(admin_user, app, setup, filetype):
def test_download_file(app, setup, filetype, admin_user):
csvdata, url = setup('field,,another_field,', filename=filetype, data=get_file_content(filetype))
setup('field,,another_field,', filename=filetype, data=get_file_content(filetype))
assert '/login' in app.get('/manage/csvdatasource/test/download/').location
app = login(app)
resp = app.get('/manage/csvdatasource/test/download/', status=200)
@ -851,7 +851,7 @@ def test_csv_validation(admin_user, app):
def test_change_csv_command(setup):
csv, url = setup(data=StringIO(data))
csv, _ = setup(data=StringIO(data))
call_command('change-csv', 'test', os.path.join(TEST_BASE_DIR, 'data-empty.ods'))
csv.refresh_from_db()
assert list(csv.get_rows()) == []

View File

@ -498,9 +498,9 @@ def test_family_pending_invoices_by_nameid():
test_orleans_data_import_command()
resource = GenericFamily.objects.get()
family = Family.objects.get(external_id='22380')
link = FamilyLink.objects.create(resource=resource, family=family, name_id='testnameid1')
FamilyLink.objects.create(resource=resource, family=family, name_id='testnameid1')
family = Family.objects.get(external_id='1228')
link = FamilyLink.objects.create(resource=resource, family=family, name_id='testnameid2')
FamilyLink.objects.create(resource=resource, family=family, name_id='testnameid2')
links = resource.get_pending_invoices_by_nameid(None)
assert len(links['data']) == 2
for uuid, invoices in links['data'].items():
@ -510,7 +510,7 @@ def test_family_pending_invoices_by_nameid():
def test_incorrect_orleans_data(caplog):
filepath = os.path.join(os.path.dirname(__file__), 'data', 'family_incorrect_data_orleans.zip')
resource = GenericFamily.objects.create(
GenericFamily.objects.create(
title='test orleans',
slug='test-orleans',
archive=File(open(filepath, 'rb'), 'family_incorrect_data_orleans.zip'),

View File

@ -165,7 +165,7 @@ def test_proxy_logger_transaction_id(mocked_send, app, arcgis):
arcgis.log_evel = 'DEBUG'
arcgis.base_url = 'https://example.net/'
arcgis.save()
resp = app.get(
app.get(
'/arcgis/test/mapservice-query',
params={'lon': 6.172122, 'lat': 48.673836, 'service': 'test'},
status=200,
@ -934,7 +934,7 @@ def test_generic_up_in_endpoints_infos(db, app, connector_class, expected):
def test_generic_endpoint_superuser_access(db, app, admin_user, simple_user):
connector = MDEL.objects.create(slug='test')
MDEL.objects.create(slug='test')
filename = os.path.join(os.path.dirname(__file__), 'data', 'mdel', 'formdata.json')
payload = json.load(open(filename))

View File

@ -128,7 +128,7 @@ def test_demand_creation_limit(app, resource, freezer):
'card_demand_purpose': 1,
'cards_quantity': 1,
}
for count in range(20):
for i in range(20):
response = app.post_json('/gesbac/test/create-demand/', params=payload)
assert response.json['err'] == 0

View File

@ -211,7 +211,7 @@ def test_greco_create_ok_no_application(mocked_post, app, conn):
payload = copy.copy(CREATE_PAYLOAD)
del payload['application']
resp = app.post_json(url, params=payload)
app.post_json(url, params=payload)
assert mocked_post.call_count == 2
def to_json(root):

View File

@ -122,7 +122,7 @@ def clear():
def test_export_csvdatasource(app, setup, filetype):
csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype))
csvdata, _ = setup('id,whatever,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype))
query = Query(slug='query-1_', resource=csvdata, structure='array')
query.projections = '\n'.join(['id:int(id)', 'prenom:prenom'])
query.save()

View File

@ -136,7 +136,7 @@ def test_create_file(mocked_post, mocked_get, app, conn):
)
response._content = force_bytes(soap_response)
mocked_post.return_value = response
title, ext = filename.split('.')
title, _ = filename.split('.')
base64_data = 'VGVzdCBEb2N1bWVudA=='
data = {
'type': typ,
@ -469,9 +469,6 @@ def test_invalid_response(mocked_post, mocked_get, app, conn):
response = Response()
response.status_code = 502
soap_response = open(
os.path.join(os.path.dirname(__file__), 'data/iparapheur_get_file_response.xml'), 'rb'
).read()
response._content = '<p>Bad Gateway</p>'
response.raison = 'Bad Gateway'
mocked_post.return_value = response

View File

@ -138,14 +138,12 @@ def test_jsondatastore_template(app, jsondatastore):
jsondatastore.text_value_template = '{{foo}}'
jsondatastore.save()
resp = app.post_json('/jsondatastore/foobar/data/create', params={'foo': 'bar'})
uuid = resp.json['id']
resp = app.get('/jsondatastore/foobar/data/')
assert len(resp.json['data']) == 1
assert resp.json['data'][0]['text'] == 'bar'
# check entries are alphabetically sorted
resp = app.post_json('/jsondatastore/foobar/data/create', params={'foo': 'aaa'})
uuid = resp.json['id']
resp = app.get('/jsondatastore/foobar/data/')
assert len(resp.json['data']) == 2
assert resp.json['data'][0]['text'] == 'aaa'

View File

@ -529,7 +529,7 @@ def test_manager_import_export(app, admin_user):
title='a title',
description='a description',
)
csv2 = CsvDataSource.objects.create(
CsvDataSource.objects.create(
csv_file=File(data, 't.csv'),
columns_keynames='id, text',
slug='test2',

View File

@ -311,7 +311,7 @@ def test_create_aec_demand_type_with_user_comment(app, setup, aec_payload):
AEC_PAYLOAD = dict(aec_payload)
display_id = AEC_PAYLOAD['display_id']
AEC_PAYLOAD['fields']['logitud_commentaire_usager'] = 'gentle user comment'
resp = app.post_json('/mdel/test/create', params=aec_payload, status=200)
app.post_json('/mdel/test/create', params=aec_payload, status=200)
# checking that attached files are referenced in -ent-.xml file
basedir = os.path.join(get_resource_base_dir(), 'test', 'inputs', '%s-EtatCivil-0' % display_id)
@ -648,11 +648,11 @@ def test_date_parsing():
from passerelle.utils.jsonresponse import APIError
with pytest.raises(APIError) as error:
date = parse_date('2018-02-29')
parse_date('2018-02-29')
assert 'day is out of range for month' in str(error)
with pytest.raises(APIError) as error:
date = parse_date('28-02-2018')
parse_date('28-02-2018')
for text in ('date', '28-02-2018', 'not iso-formated'):
assert text in str(error)

View File

@ -594,7 +594,7 @@ def test_typename_parameter_upgrade(mocked_get, server_responses, version, typen
endpoint = tests.utils.generic_endpoint_url('opengis', 'features', slug=connector.slug)
assert endpoint == '/opengis/test/features'
mocked_get.side_effect = server_responses
resp = app.get(endpoint, params={'type_names': '...', 'property_name': '...'})
app.get(endpoint, params={'type_names': '...', 'property_name': '...'})
assert mocked_get.call_args[1]['params']['request'] == 'GetFeature'
assert mocked_get.call_args[1]['params']['version'] == version
assert typename_label in mocked_get.call_args[1]['params'].keys()

View File

@ -235,11 +235,11 @@ def test_photon_addresses_qs_coordinates(mocked_get, app, photon):
photon.latitude = 1.2
photon.longitude = 2.1
photon.save()
resp = app.get('/photon/%s/addresses?q=plop' % photon.slug)
app.get('/photon/%s/addresses?q=plop' % photon.slug)
assert 'lat=%s' % photon.latitude in mocked_get.call_args[0][0]
assert 'lon=%s' % photon.longitude in mocked_get.call_args[0][0]
resp = app.get('/photon/%s/addresses?q=plop&lat=42&lon=43' % photon.slug)
app.get('/photon/%s/addresses?q=plop&lat=42&lon=43' % photon.slug)
assert 'lat=42' in mocked_get.call_args[0][0]
assert 'lon=43' in mocked_get.call_args[0][0]

View File

@ -456,7 +456,6 @@ def test_create_and_publish(app, connector, token):
def test_create_wrong_payload(app, connector, token):
endpoint = tests.utils.generic_endpoint_url('plone-restapi', 'create', slug=connector.slug)
assert endpoint == '/plone-restapi/my_connector/create'
url = connector.service_url + '/braine-l-alleud'
payload = 'not json'
resp = app.post(endpoint + '?uri=braine-l-alleud', params=payload, status=400)
assert resp.json['err']
@ -492,7 +491,6 @@ def test_update(app, connector, token):
def test_update_wrong_payload(app, connector, token):
endpoint = tests.utils.generic_endpoint_url('plone-restapi', 'update', slug=connector.slug)
assert endpoint == '/plone-restapi/my_connector/update'
url = connector.service_url + '/braine-l-alleud/dccd85d12cf54b6899dff41e5a56ee7f'
query_string = '?uri=braine-l-alleud&uid=dccd85d12cf54b6899dff41e5a56ee7f'
payload = 'not json'
resp = app.post(endpoint + query_string, params=payload, status=400)

View File

@ -22,7 +22,7 @@ from tests.test_availability import down_mock, up_mock
@pytest.fixture
def connector():
connector, created = Feed.objects.get_or_create(slug='some-slug')
connector, _ = Feed.objects.get_or_create(slug='some-slug')
connector.set_log_level('DEBUG')
connector.url = 'http://example.net/'
connector.save()
@ -391,7 +391,7 @@ def test_proxy_logger_email_traceback(app, db, email_handler, settings, mailoutb
raise requests.ConnectionError('timeout')
monkeypatch.setattr(Feed, 'json', json)
resp = app.get(endpoint_url, status=500)
app.get(endpoint_url, status=500)
assert any('Traceback:' in mail.body for mail in mailoutbox)

View File

@ -131,7 +131,7 @@ def test_log_error_http_max_sizes(caplog, log_level, settings):
settings.LOGGED_RESPONSES_MAX_SIZE = 7
with HTTMock(http400_mock):
requests = Request(logger=logger)
response = requests.post(url, json={'name': 'josh'})
requests.post(url, json={'name': 'josh'})
if logger.level == 10: # DEBUG
records = [record for record in caplog.records if record.name == 'requests']
@ -181,7 +181,7 @@ def test_skip_content_type(mocked_get, caplog, endpoint_response):
logger = logging.getLogger('requests')
logger.setLevel(logging.DEBUG)
requests = Request(logger=logger)
response = requests.get('http://example.net/whatever').text
requests.get('http://example.net/whatever').text
records = [record for record in caplog.records if record.name == 'requests']
if 'xml' in endpoint_response.headers.get('Content-Type'):
@ -267,7 +267,7 @@ def test_resource_hawk_auth(mocked_send, caplog, endpoint_response):
credentials = {'id': 'id', 'key': 'key', 'algorithm': 'sha256'}
hawk_auth = HawkAuth(**credentials)
resp = request.get('http://httpbin.org/get', auth=hawk_auth)
request.get('http://httpbin.org/get', auth=hawk_auth)
prepared_method = mocked_send.call_args[0][0]
assert 'Authorization' in prepared_method.headers
generated_header = prepared_method.headers['Authorization']
@ -288,7 +288,7 @@ def test_resource_hawk_auth(mocked_send, caplog, endpoint_response):
assert dict(generated_parts) == dict(expected_parts)
hawk_auth = HawkAuth(ext='extra attribute', **credentials)
resp = request.post('http://httpbin.org/post', auth=hawk_auth, json={'key': 'value'})
request.post('http://httpbin.org/post', auth=hawk_auth, json={'key': 'value'})
prepared_method = mocked_send.call_args[0][0]
assert 'Authorization' in prepared_method.headers
generated_header = prepared_method.headers['Authorization']

View File

@ -265,7 +265,7 @@ def test_sms_max_message_length(app, connector):
}
with mock.patch.object(OVHSMSGateway, 'send_msg') as send_function:
send_function.return_value = None
result = app.post_json(path, params=payload)
app.post_json(path, params=payload)
connector.jobs()
assert send_function.call_args[1]['text'] == 'a' * connector.max_message_length
@ -282,7 +282,7 @@ def test_sms_log(app, connector):
}
with mock.patch.object(OVHSMSGateway, 'send_msg') as send_function:
send_function.return_value = 1
result = app.post_json(path, params=payload)
app.post_json(path, params=payload)
connector.jobs()
assert SMSLog.objects.filter(
appname=connector.get_connector_slug(), slug=connector.slug, credits=1
@ -290,7 +290,7 @@ def test_sms_log(app, connector):
with mock.patch.object(OVHSMSGateway, 'send_msg') as send_function:
send_function.return_value = 2
result = app.post_json(path, params=payload)
app.post_json(path, params=payload)
connector.jobs()
assert SMSLog.objects.filter(
appname=connector.get_connector_slug(), slug=connector.slug, credits=2
@ -336,7 +336,7 @@ def test_sms_nostop_parameter(app, connector):
)
with send_patch as send_function:
send_function.return_value = None
result = app.post_json(base_path, params=payload)
app.post_json(base_path, params=payload)
connector.jobs()
assert send_function.call_args[1]['text'] == 'not a spam'
assert send_function.call_args[1]['stop'] == ('nostop' not in path)
@ -466,7 +466,7 @@ def test_ovh_new_api_credit(app, freezer, admin_user):
'creditsLeft': 123,
}
ovh_url = connector.API_URL % {'serviceName': 'sms-test42'}
with tests.utils.mock_url(ovh_url, resp, 200) as mocked:
with tests.utils.mock_url(ovh_url, resp, 200):
connector.jobs()
connector.refresh_from_db()
assert connector.credit_left == 123
@ -478,7 +478,7 @@ def test_ovh_new_api_credit(app, freezer, admin_user):
resp = {
'creditsLeft': 456,
}
with tests.utils.mock_url(ovh_url, resp, 200) as mocked:
with tests.utils.mock_url(ovh_url, resp, 200):
connector.hourly()
assert connector.credit_left == 456
@ -504,13 +504,13 @@ def test_ovh_alert_emails(app, freezer, mailoutbox):
freezer.move_to('2019-01-01 00:00:00')
resp = {'creditsLeft': 101}
ovh_url = connector.API_URL % {'serviceName': 'sms-test42'}
with tests.utils.mock_url(ovh_url, resp, 200) as mocked:
with tests.utils.mock_url(ovh_url, resp, 200):
connector.hourly()
assert len(mailoutbox) == 0
resp = {'creditsLeft': 99}
ovh_url = connector.API_URL % {'serviceName': 'sms-test42'}
with tests.utils.mock_url(ovh_url, resp, 200) as mocked:
with tests.utils.mock_url(ovh_url, resp, 200):
connector.hourly()
assert len(mailoutbox) == 1
@ -527,12 +527,12 @@ def test_ovh_alert_emails(app, freezer, mailoutbox):
freezer.move_to('2019-01-01 12:00:00')
resp = {'creditsLeft': 99}
ovh_url = connector.API_URL % {'serviceName': 'sms-test42'}
with tests.utils.mock_url(ovh_url, resp, 200) as mocked:
with tests.utils.mock_url(ovh_url, resp, 200):
connector.hourly()
assert len(mailoutbox) == 0
freezer.move_to('2019-01-02 01:00:07')
with tests.utils.mock_url(ovh_url, resp, 200) as mocked:
with tests.utils.mock_url(ovh_url, resp, 200):
connector.hourly()
assert len(mailoutbox) == 1
@ -584,12 +584,12 @@ def test_ovh_token_request_error(admin_user, app):
ovh_request_token_url = 'https://eu.api.ovh.com/1.0/auth/credential'
ovh_response = {'message': 'Invalid application key'}
with tests.utils.mock_url(ovh_request_token_url, ovh_response, 401) as mocked:
with tests.utils.mock_url(ovh_request_token_url, ovh_response, 401):
resp = resp.click('request access').follow()
assert 'error requesting token: Invalid application key.' in resp.text
ovh_response = 'not-json'
with tests.utils.mock_url(ovh_request_token_url, ovh_response, 401) as mocked:
with tests.utils.mock_url(ovh_request_token_url, ovh_response, 401):
resp = resp.click('request access').follow()
assert 'error requesting token: bad JSON response' in resp.text
@ -680,11 +680,11 @@ def test_api_statistics(app, freezer, connector, admin_user):
assert len(resp.json['data']['series'][0]['data']) == 0
freezer.move_to('2021-01-01 12:00')
for _ in range(5):
for i in range(5):
SMSLog.objects.create(appname='ovh', slug='ovhsmsgateway')
freezer.move_to('2021-02-03 13:00')
for _ in range(3):
for i in range(3):
SMSLog.objects.create(appname='ovh', slug='ovhsmsgateway')
freezer.move_to('2021-02-06 13:00')

View File

@ -33,7 +33,7 @@ def test_render_body_schemas(db):
connector_model = app.get_connector_model()
if connector_model is None:
continue
for name, method in inspect.getmembers(connector_model, predicate):
for _, method in inspect.getmembers(connector_model, predicate):
if not hasattr(method, 'endpoint_info'):
continue
if method.endpoint_info.post and method.endpoint_info.post.get('request_body', {}).get(

View File

@ -599,7 +599,7 @@ def test_create_intervention_timeout_error(mocked_uuid, app, smart):
@mock.patch("django.db.models.fields.UUIDField.get_default", return_value=UUID)
def test_create_intervention_inconsistency_id_error(mocked_uuid4, app, freezer, smart):
freezer.move_to('2021-07-08 00:00:00')
resp = app.post_json(URL + 'create-intervention/', params=CREATE_INTERVENTION_PAYLOAD)
app.post_json(URL + 'create-intervention/', params=CREATE_INTERVENTION_PAYLOAD)
wcs_request = smart.wcs_requests.get(uuid=UUID)
assert wcs_request.status == 'registered'
job = Job.objects.get(method_name='create_intervention_job')
@ -619,7 +619,7 @@ def test_create_intervention_inconsistency_id_error(mocked_uuid4, app, freezer,
@mock.patch("django.db.models.fields.UUIDField.get_default", return_value=UUID)
def test_create_intervention_content_error(mocked_uuid, app, freezer, smart):
freezer.move_to('2021-07-08 00:00:00')
resp = app.post_json(URL + 'create-intervention/', params=CREATE_INTERVENTION_PAYLOAD)
app.post_json(URL + 'create-intervention/', params=CREATE_INTERVENTION_PAYLOAD)
wcs_request = smart.wcs_requests.get(uuid=UUID)
assert wcs_request.status == 'registered'
assert 'invalid json' in wcs_request.result
@ -632,7 +632,7 @@ def test_create_intervention_content_error(mocked_uuid, app, freezer, smart):
@mock.patch("django.db.models.fields.UUIDField.get_default", return_value=UUID)
def test_create_intervention_client_error(mocked_uuid, app, freezer, smart):
freezer.move_to('2021-07-08 00:00:00')
resp = app.post_json(URL + 'create-intervention/', params=CREATE_INTERVENTION_PAYLOAD)
app.post_json(URL + 'create-intervention/', params=CREATE_INTERVENTION_PAYLOAD)
wcs_request = smart.wcs_requests.get(uuid=UUID)
assert '400 Client Error' in wcs_request.result
assert wcs_request.tries == 1
@ -739,7 +739,7 @@ def test_update_intervention(mocked_uuid, app, smart, wcs_service):
assert CREATE_INTERVENTION_QUERY[
'notification_url'
] == 'http://testserver/toulouse-smart/test/update-intervention?uuid=%s' % str(UUID)
wcs_request = smart.wcs_requests.get(uuid=UUID)
smart.wcs_requests.get(uuid=UUID)
mocked_push = mock.patch(
"passerelle.contrib.toulouse_smart.models.SmartRequest.push",
@ -989,8 +989,7 @@ def test_add_media_with_create_intervention_failure(mocked_uuid, app, smart):
wcs_request = smart.wcs_requests.get(uuid=UUID)
wcs_request.status = 'failed'
wcs_request.save()
wcs_request_file = wcs_request.files.get(**job.parameters)
path = wcs_request_file.content.path
wcs_request.files.get(**job.parameters)
smart.jobs()
job = Job.objects.get(method_name='add_media_job')

View File

@ -15,7 +15,7 @@ def generic_endpoint_url(connector, endpoint, slug='test'):
def setup_access_rights(obj):
api, created = ApiUser.objects.get_or_create(username='all', keytype='', key='')
api, _ = ApiUser.objects.get_or_create(username='all', keytype='', key='')
obj_type = ContentType.objects.get_for_model(obj)
AccessRight.objects.create(codename='can_access', apiuser=api, resource_type=obj_type, resource_pk=obj.pk)
return obj