This repository has been archived on 2023-02-21. You can view files and clone it, but cannot push or open issues or pull requests.
themis.libellioimport/themis/libellioimport/migration.py

167 lines
7.2 KiB
Python

# -*- coding: utf-8 -*-
import cPickle
import os
import datetime
import cStringIO
from Products.Five.browser import BrowserView
from Products.CMFCore.utils import getToolByName
from plone.dexterity.factory import DexterityFactory
from plone.dexterity.utils import addContentToContainer
from plone.namedfile.file import NamedFile
from Products.CMFCore.WorkflowCore import WorkflowException
class Migrate(BrowserView):
def __call__(self):
if self.request['REQUEST_METHOD'] == 'GET':
filename = self.request.form.get('filename')
if not filename:
return 'Missing filename\n'
fd = file(os.path.join('/tmp/', filename))
else:
fd = cStringIO.StringIO(self.request.form['data'])
doc = cPickle.load(fd)
portal = getToolByName(self.context, 'portal_url').getPortalObject()
workflow_tool = getToolByName(self.context, 'portal_workflow')
plone_tool = getToolByName(self.context, 'plone_utils')
if doc.get('state') not in ('filed', 'preDocumented'):
return 'Unhandled state (%s)\n' % (doc.get('state'))
if doc.get('meta_type') == 'PFBArbitralCourt' and doc.get('id').startswith('cour-d-arbitrage'):
return 'Arbitral Court documents are not to be imported\n'
# first step is to create a "courrier entrant" object
typename = 'courrier_entrant'
factory = DexterityFactory(portal_type=typename)
if not hasattr(self.context, doc.get('id')):
ob = factory(id=doc.get('id'), title=doc.get('title'))
self.context._setObject(ob.id, ob)
ob = getattr(self.context, doc.get('id'))
ob.date_reception = datetime.datetime.strptime(doc.get('deliverydate'), '%Y-%m-%d').date()
ob.numero_courrier = doc.get('mailnumber')
ob.fichier = NamedFile(fd.read(), filename=unicode(doc.get('filename')))
ob.categorie_de_courrier = [{
'PFBAddressChange': u'Changement de coordonnées',
'PFBGeneralMail': u'Courrier général divers',
'PFBArbitralCourt': u'Relations publiques',
'PFBSubventionRequest': u'Demande de subvention',
'PFBActivityAndExpertReport': u'''Rapport d'activités/d'experts''',
'PFBCollegialNotification': u'Notifications du Collège',
'PFBMeetingExcuses': u'Excusés',
'PFBAuditingCommitteeCommunication': u'Communication gouvernementale',
'PFBGeneralCommission': u'Commissions divers',
'PFBChallenge': u'Interpellation',
'PFBWriteAnswer': u'Réponse écrite',
'PFBCurrentEventsQuestion': u'''Question d'actualité''',
'PFBOralRequest': u'Question orale',
}.get(doc.get('meta_type'))]
ob.expediteur = []
shipper_id = None
if doc.get('shippers'):
shipper = unicode(doc.get('shippers')[0], 'utf-8')
shipper_id = plone_tool.normalizeString(shipper)
if not portal.contacts.has_key(shipper_id):
portal.contacts.invokeFactory('themis.datatypes.contact', shipper_id, title=shipper)
shipper_id = 'contact:' + shipper_id
else:
if doc.get('meta_type') == 'PFBWriteAnswer':
# shipper is a ministry
shipper_id = self.get_ministry(doc.get('authors')[0])
elif doc.get('meta_type') in ('PFBChallenge',
'PFBCurrentEventsQuestion', 'PFBOralRequest'):
# shipper is a deputy
shipper_id = self.get_deputy(doc.get('authors')[0])
if shipper_id:
ob.expediteur.append(shipper_id)
try:
workflow_tool.doActionFor(ob, 'publish')
except WorkflowException:
pass
second_object_typename = {
'PFBChallenge': 'interpellationD',
'PFBWriteAnswer': 'reponse_a_question_ecriteD',
'PFBCurrentEventsQuestion': 'questionactualiteD',
'PFBOralRequest': 'QuestionoraleD',
}.get(doc.get('meta_type'))
if second_object_typename:
# XXX: this is a temporary location, no decision has been taken yet
# on the location of those documents
docdir = getattr(self.context.aq_parent, 'documents-diffusables')
if not hasattr(docdir, doc.get('id')):
factory = DexterityFactory(portal_type=second_object_typename)
ob2 = factory(id=doc.get('id'), title=doc.get('title'))
docdir._setObject(ob2.id, ob2)
else:
ob2 = getattr(docdir, doc.get('id'))
ob2.date_reception = datetime.datetime.strptime(doc.get('deliverydate'), '%Y-%m-%d').date()
ob2.mail_ref_id = ob.numero_courrier
ob2.session = doc.get('session')
if second_object_typename == 'reponse_a_question_ecriteD':
if doc.get('authors'):
ob2.ministre_auteur_reponse = self.get_ministry(doc.get('authors')[0])
elif second_object_typename in ('interpellationD',
'questionactualiteD', 'QuestionoraleD'):
if doc.get('authors'):
ob2.auteur = [self.get_deputy(x) for x in doc.get('authors')]
if doc.get('recipients'):
ob2.ministres_concernes = [self.get_ministry(x) for x in doc.get('recipients')]
return u'→ OK\n'
def get_ministry(self, author):
if not author:
return None
plone_tool = getToolByName(self.context, 'plone_utils')
portal = getToolByName(self.context, 'portal_url').getPortalObject()
author = unicode(author, 'utf-8')
author_id = plone_tool.normalizeString(author)
if author_id == 'college':
return 'ministry:college'
if not portal.ministres.has_key(author_id):
workflow_tool = getToolByName(self.context, 'portal_workflow')
lastname, firstname = author.split(',')
portal.ministres.invokeFactory('themis.datatypes.ministry', author_id,
firstname=firstname.strip(),
lastname=lastname.strip())
ministry = getattr(portal.ministres, author_id)
try:
workflow_tool.doActionFor(ministry, 'publish')
except WorkflowException:
pass
return 'ministry:'+author_id
def get_deputy(self, author):
plone_tool = getToolByName(self.context, 'plone_utils')
portal = getToolByName(self.context, 'portal_url').getPortalObject()
author = unicode(author, 'utf-8')
author_id = plone_tool.normalizeString(author)
if not portal.deputes.has_key(author_id):
workflow_tool = getToolByName(self.context, 'portal_workflow')
lastname, firstname = author.split(',')
portal.deputes.invokeFactory('themis.datatypes.deputy', author_id,
firstname=firstname.strip(),
lastname=lastname.strip())
deputy = getattr(portal.deputes, author_id)
try:
workflow_tool.doActionFor(deputy, 'publish')
except WorkflowException:
pass
return 'deputy:'+author_id