wscall: allow storing in backoffice file without varname (#25784)

This commit is contained in:
Thomas NOËL 2018-08-23 14:20:38 +02:00
parent ec8efb6497
commit f5857ca745
2 changed files with 48 additions and 13 deletions

View File

@ -1736,16 +1736,41 @@ def test_webservice_call_store_in_backoffice_filefield(http_requests, pub):
item.parent = st1
item.backoffice_filefield_id = 'bo1'
item.url = 'http://remote.example.net/xml'
item.varname = 'xxx'
item.response_type = 'attachment'
item.record_errors = True
item.perform(formdata)
assert 'bo1' in formdata.data
fbo1 = formdata.data['bo1']
assert fbo1.base_filename == 'file-bo1.xml'
assert fbo1.content_type == 'text/xml'
assert fbo1.get_content().startswith('<?xml')
# nothing else is stored
assert formdata.workflow_data is None
assert not formdata.evolution[-1].parts
# store in backoffice file field + varname
formdata = formdef.data_class()()
formdata.data = {}
formdata.just_created()
formdata.store()
item.varname = 'xxx'
item.perform(formdata)
# backoffice file field
assert 'bo1' in formdata.data
fbo1 = formdata.data['bo1']
assert fbo1.base_filename == 'xxx.xml'
assert fbo1.content_type == 'text/xml'
assert fbo1.get_content().startswith('<?xml')
# varname => workflow_data and AttachmentEvolutionPart
assert formdata.workflow_data.get('xxx_status') == 200
assert formdata.workflow_data.get('xxx_content_type') == 'text/xml'
attachment = formdata.evolution[-1].parts[-1]
assert isinstance(attachment, AttachmentEvolutionPart)
assert attachment.base_filename == 'xxx.xml'
assert attachment.content_type == 'text/xml'
attachment.fp.seek(0)
assert attachment.fp.read(5) == '<?xml'
# no more 'bo1' backoffice field: do nothing
formdata = formdef.data_class()()
@ -1768,6 +1793,7 @@ def test_webservice_call_store_in_backoffice_filefield(http_requests, pub):
item.perform(formdata)
assert formdata.data == {}
def test_webservice_target_status(pub):
wf = Workflow(name='boo')
status1 = wf.add_status('Status1', 'st1')

View File

@ -322,6 +322,13 @@ class WebserviceCallStatusItem(WorkflowStatusItem):
formdata.update_workflow_data(workflow_data)
formdata.store()
if self.backoffice_filefield_id:
if (status // 100) == 2 and app_error_code == 0 and status not in (204, 205):
filename, content_type = self.get_attachment_data(response)
self.store_in_backoffice_filefield(
formdata, self.backoffice_filefield_id,
filename, content_type, data)
if app_error_code != 0:
self.action_on_error(self.action_on_app_error, formdata, response, data=data)
if (status // 100) == 4:
@ -329,6 +336,19 @@ class WebserviceCallStatusItem(WorkflowStatusItem):
if (status // 100) == 5:
self.action_on_error(self.action_on_5xx, formdata, response, data=data)
def get_attachment_data(self, response):
content_type = response.headers.get('content-type') or ''
if content_type:
content_type = content_type.split(';')[0].strip().lower()
extension = mimetypes.guess_extension(content_type, strict=False) or ''
if self.varname:
filename = '%s%s' % (self.varname, extension)
elif self.backoffice_filefield_id:
filename = 'file-%s%s' % (self.backoffice_filefield_id, extension)
else:
filename = 'file%s' % extension
return filename, content_type
def store_response(self, formdata, response, data, workflow_data):
if self.response_type == 'json':
try:
@ -346,25 +366,14 @@ class WebserviceCallStatusItem(WorkflowStatusItem):
elif d.get('display_id'):
formdata.id_display = d.get('display_id')
else: # store result as attachment
content_type = response.headers.get('content-type') or ''
if content_type:
content_type = content_type.split(';')[0].strip().lower()
filename, content_type = self.get_attachment_data(response)
workflow_data['%s_content_type' % self.varname] = content_type
workflow_data['%s_length' % self.varname] = len(data)
extension = mimetypes.guess_extension(content_type, strict=False) or ''
filename = '%s%s' % (self.varname, extension)
fp_content = StringIO(data)
attachment = AttachmentEvolutionPart(filename, fp_content,
content_type=content_type,
varname=self.varname)
formdata.evolution[-1].add_part(attachment)
if self.backoffice_filefield_id:
self.store_in_backoffice_filefield(
formdata,
self.backoffice_filefield_id,
filename,
content_type,
data)
def action_on_error(self, action, formdata, response=None, data=None, exc_info=None):
if action in (':pass', ':stop') and (self.notify_on_errors or self.record_errors):