mdel: keep sending demands (#86379)

- add Demand.sent attribute
- set Demand.sent to True after pushing the demand
- retry after one hour in case of failure
This commit is contained in:
Benjamin Dauvergne 2024-01-31 13:28:47 +01:00
parent df0084d202
commit 3ad6c89068
3 changed files with 84 additions and 19 deletions

View File

@ -0,0 +1,22 @@
# Generated by Django 3.2.18 on 2024-01-31 12:27
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('mdel', '0006_auto_20210126_1440'),
]
operations = [
migrations.AddField(
model_name='demand',
name='sent',
field=models.BooleanField(default=True),
),
migrations.AlterField(
model_name='demand',
name='sent',
field=models.BooleanField(default=False),
),
]

View File

@ -14,12 +14,14 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import datetime
import json
import os
import shutil
import zipfile
from django.db import models
from django.db import models, transaction
from django.utils.timezone import now
from django.utils.translation import gettext_lazy as _
from passerelle.base.models import BaseResource
@ -91,6 +93,7 @@ class MDEL(BaseResource):
return cls._meta.verbose_name
@endpoint(methods=['post'])
@transaction.atomic
def create(self, request, *args, **kwargs):
"""Create a demand"""
formdata = json.loads(request.body)
@ -168,14 +171,6 @@ class MDEL(BaseResource):
def output_dir(self):
return os.path.join(mdel.get_resource_base_dir(), self.slug, 'outputs')
def send_demand(self, demand_id):
try:
demand = Demand.objects.get(demand_id=demand_id)
except Demand.DoesNotExist:
return
with self.outcoming_sftp.client() as client:
client.put(demand.filepath, os.path.join(client.getcwd(), demand.filename))
def get_response_files(self):
if not os.path.exists(self.output_dir):
os.makedirs(self.output_dir)
@ -189,6 +184,9 @@ class MDEL(BaseResource):
def hourly(self):
super().hourly()
if self.outcoming_sftp:
for demand in self.demand_set.filter(sent=False):
demand.send()
if self.incoming_sftp:
self.get_response_files()
@ -207,6 +205,7 @@ class Demand(models.Model):
status = models.CharField(max_length=32, null=True)
step = models.IntegerField(default=0)
demand_id = models.CharField(max_length=128, null=True)
sent = models.BooleanField(default=False)
def __str__(self):
return '%s - %s - %s' % (self.resource.slug, self.demand_id, self.status)
@ -454,9 +453,27 @@ class Demand(models.Model):
return result
@transaction.atomic
def send(self):
# lock demand
if Demand.objects.select_for_update().get(pk=self.pk).sent:
return
try:
with self.resource.outcoming_sftp.client() as client:
client.put(self.filepath, os.path.join(client.getcwd(), self.filename))
self.sent = True
self.save(update_fields=['sent'])
self.resource.logger.info('Demand %s (pk %s) sent.', self.name, self.pk)
except Exception as e:
if (now() - self.created_at) > datetime.timedelta(days=7):
log_function = self.resource.logger.error
else:
log_function = self.resource.logger.warning
log_function('Demand %s (pk %s) could not be sent: %s', self.name, self.pk, e)
@classmethod
def cleanup(cls):
for instance in cls.objects.all():
for instance in cls.objects.filter(sent=True):
dirname = os.path.join(instance.resource.input_dir, instance.name)
if not os.path.exists(dirname):
continue

View File

@ -18,6 +18,7 @@ import base64
import copy
import json
import os
import pathlib
import shutil
import zipfile
from xml.etree import ElementTree as etree
@ -302,10 +303,6 @@ def test_create_aec_demand_type(app, setup, aec_payload):
assert root.find('DemandeActe/Titulaire/Filiation/Pere/Nom').text == 'Yamamoto'
assert root.find('DemandeActe/Titulaire/Filiation/Pere/Prenoms').text == 'Ryu'
assert os.path.exists(basedir)
setup.daily()
assert not os.path.exists(basedir)
def test_create_aec_demand_type_with_user_comment(app, setup, aec_payload):
AEC_PAYLOAD = dict(aec_payload)
@ -322,15 +319,25 @@ def test_create_aec_demand_type_with_user_comment(app, setup, aec_payload):
assert root.find('ns2:LocalAccess/ns2:CommentaireUsager', namespaces=ns).text == 'gentle user comment'
def test_create_aec_demand_with_output_sftp(app, setup, aec_payload, sftpserver):
def test_create_aec_demand_with_output_sftp(app, setup, aec_payload, sftpserver, caplog):
setup.outcoming_sftp = SFTP(
'sftp://foo:bar@{server.host}:{server.port}/output/'.format(server=sftpserver)
)
setup.save()
resp = app.post_json('/mdel/test/create', params=aec_payload, status=200)
expected_filename = '%s-0.zip' % resp.json['data']['demand_id']
with sftpserver.serve_content({'output': {expected_filename: 'content'}}):
setup.jobs()
app.post_json('/mdel/test/create', params=aec_payload, status=200)
caplog.clear()
demand = setup.demand_set.filter(sent=False).get()
assert not demand.sent
content_object = {'output': {}}
with sftpserver.serve_content(content_object):
setup.hourly()
assert not setup.demand_set.filter(sent=False).exists()
# check zip file was transfered
assert set(content_object['output']) == {demand.filename}
with open(demand.filepath, 'rb') as fd:
assert content_object['output'][demand.filename] == fd.read()
assert caplog.messages[-1:] == [f'Demand {demand.name} (pk {demand.num}) sent.']
def test_create_aec_demand_with_input_sftp(app, setup, aec_payload, sftpserver):
@ -718,3 +725,22 @@ def test_aec_filenames_and_routing(app, setup):
).text
== '15-4-EtatCivil-doc-ActeEtatCivil-XML-1-0.xml'
)
def resource_logs(caplog):
return [record.msg for record in caplog.records if record.name == 'passerelle.resource.mdel.test']
def test_create_aec_demand_with_output_sftp_error(app, setup, aec_payload, sftpserver, caplog):
caplog.set_level('WARNING')
setup.outcoming_sftp = SFTP(
'sftp://foo:bar@{server.host}:{server.port}/output/'.format(server=sftpserver)
)
setup.save()
app.post_json('/mdel/test/create', params=aec_payload, status=200)
setup.hourly()
demand = setup.demand_set.get()
assert not demand.sent
assert resource_logs(caplog) == ['Demand %s (pk %s) could not be sent: %s']