229 lines
9.5 KiB
Python
229 lines
9.5 KiB
Python
import argparse
|
|
import logging
|
|
import os
|
|
import xml.etree.ElementTree as ET
|
|
|
|
from wcs.blocks import BlockDef
|
|
from wcs.carddef import CardDef
|
|
from wcs.categories import (
|
|
BlockCategory,
|
|
CardDefCategory,
|
|
Category,
|
|
DataSourceCategory,
|
|
MailTemplateCategory,
|
|
WorkflowCategory,
|
|
)
|
|
from wcs.data_sources import NamedDataSource
|
|
from wcs.formdef import FormDef
|
|
from wcs.mail_templates import MailTemplate
|
|
from wcs.workflows import Workflow
|
|
from wcs.wscalls import NamedWsCall
|
|
|
|
from wcs.ctl.management.commands import TenantCommand
|
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
|
|
|
|
|
class Command(TenantCommand):
|
|
|
|
def add_arguments(self, parser):
|
|
parser.add_argument('-d', '--domain', '--vhost', required=True, metavar='DOMAIN')
|
|
parser.add_argument('args', nargs=argparse.REMAINDER)
|
|
|
|
def handle(self, *args, **options):
|
|
domain = options.pop('domain')
|
|
self.init_tenant_publisher(domain)
|
|
|
|
self.directory = args[0]
|
|
self.import_categories()
|
|
self.import_datasources()
|
|
self.import_wscalls()
|
|
self.import_mail_templates()
|
|
self.import_workflows()
|
|
self.import_blocks()
|
|
self.import_carddefs()
|
|
self.import_formdefs()
|
|
|
|
def import_categories(self):
|
|
for category_klass in (
|
|
Category,
|
|
CardDefCategory,
|
|
WorkflowCategory,
|
|
BlockCategory,
|
|
MailTemplateCategory,
|
|
DataSourceCategory,
|
|
):
|
|
dirname = category_klass.xml_root_node
|
|
if not os.path.exists(os.path.join(self.directory, dirname)):
|
|
continue
|
|
for filename in os.listdir(os.path.join(self.directory, dirname)):
|
|
logging.info('Importing %s %s', dirname, filename)
|
|
category = category_klass.import_from_xml(
|
|
open(os.path.join(self.directory, dirname, filename))
|
|
)
|
|
try:
|
|
existing_category = category_klass.get_by_urlname(category.url_name)
|
|
except KeyError:
|
|
category.store()
|
|
except Exception as e:
|
|
raise Exception('failed to load existing category %r' % filename) from e
|
|
else:
|
|
# replace
|
|
category.id = existing_category.id
|
|
category.store()
|
|
|
|
def import_datasources(self):
|
|
if not os.path.exists(os.path.join(self.directory, 'datasources')):
|
|
return
|
|
for filename in os.listdir(os.path.join(self.directory, 'datasources')):
|
|
logging.info('Importing datasource %s', filename)
|
|
datasource = NamedDataSource.import_from_xml(
|
|
open(os.path.join(self.directory, 'datasources', filename))
|
|
)
|
|
try:
|
|
existing_datasource = NamedDataSource.get_by_slug(datasource.slug, ignore_errors=False)
|
|
except KeyError:
|
|
datasource.store(comment='Indus Initial Import')
|
|
except Exception as e:
|
|
raise Exception('failed to load existing datasource %r' % filename) from e
|
|
else:
|
|
# replace
|
|
datasource.id = existing_datasource.id
|
|
datasource.store(comment='Indus Update')
|
|
|
|
def import_mail_templates(self):
|
|
if not os.path.exists(os.path.join(self.directory, 'mail-templates')):
|
|
return
|
|
for filename in os.listdir(os.path.join(self.directory, 'mail-templates')):
|
|
logging.info('Importing mail template %s', filename)
|
|
mail_template = MailTemplate.import_from_xml(
|
|
open(os.path.join(self.directory, 'mail-templates', filename))
|
|
)
|
|
try:
|
|
existing_mail_template = MailTemplate.get_by_slug(mail_template.slug)
|
|
except Exception as e:
|
|
raise Exception('failed to load existing mail template %r' % filename) from e
|
|
if existing_mail_template is None:
|
|
mail_template.store()
|
|
else:
|
|
# replace
|
|
mail_template.id = existing_mail_template.id
|
|
mail_template.store()
|
|
|
|
def import_workflows(self):
|
|
if not os.path.exists(os.path.join(self.directory, 'workflows')):
|
|
return
|
|
for filename in os.listdir(os.path.join(self.directory, 'workflows')):
|
|
logging.info('Importing workflow %s', filename)
|
|
workflow = Workflow.import_from_xml(
|
|
open(os.path.join(self.directory, 'workflows', filename)),
|
|
include_id=False,
|
|
check_datasources=True,
|
|
)
|
|
try:
|
|
existing_workflow = [
|
|
x for x in Workflow.select(ignore_errors=True) if x and x.name == workflow.name
|
|
][0]
|
|
except IndexError:
|
|
workflow.store(comment='Indus Initial Import')
|
|
except Exception as e:
|
|
raise Exception('failed to load existing workflow %r' % filename) from e
|
|
else:
|
|
# replace
|
|
workflow.id = existing_workflow.id
|
|
workflow.store(comment='Indus Update')
|
|
|
|
def import_wscalls(self):
|
|
if not os.path.exists(os.path.join(self.directory, 'wscalls')):
|
|
return
|
|
for filename in os.listdir(os.path.join(self.directory, 'wscalls')):
|
|
logging.info('Importing wscall %s', filename)
|
|
wscall = NamedWsCall.import_from_xml(open(os.path.join(self.directory, 'wscalls', filename)))
|
|
try:
|
|
existing_wscall = NamedWsCall.get(filename, ignore_errors=False)
|
|
except KeyError:
|
|
wscall.store(comment='Indus Initial Import')
|
|
except Exception as e:
|
|
raise Exception('failed to load existing wscall %r' % filename) from e
|
|
else:
|
|
# replace
|
|
wscall.id = existing_wscall.id
|
|
wscall.store(comment='Indus Update')
|
|
|
|
def import_blocks(self):
|
|
if not os.path.exists(os.path.join(self.directory, 'blocks')):
|
|
return
|
|
for filename in os.listdir(os.path.join(self.directory, 'blocks')):
|
|
logging.info('Importing block %s', filename)
|
|
fd = open(os.path.join(self.directory, 'blocks', filename))
|
|
tree = ET.parse(fd)
|
|
# do not use import_from_xml to avoid autofixing url_name.
|
|
blockdef = BlockDef.import_from_xml_tree(tree, include_id=False)
|
|
try:
|
|
existing_blockdef = BlockDef.get_on_index(blockdef.slug, 'slug')
|
|
except KeyError:
|
|
blockdef.store(comment='Indus Initial Import')
|
|
except Exception as e:
|
|
raise Exception('failed to load existing block %r' % filename) from e
|
|
else:
|
|
# replace
|
|
blockdef.id = existing_blockdef.id
|
|
blockdef.store(comment='Indus Update')
|
|
|
|
def import_carddefs(self):
|
|
if not os.path.exists(os.path.join(self.directory, 'carddefs')):
|
|
return
|
|
for filename in os.listdir(os.path.join(self.directory, 'carddefs')):
|
|
logging.info('Importing carddef %s', filename)
|
|
fd = open(os.path.join(self.directory, 'carddefs', filename))
|
|
tree = ET.parse(fd)
|
|
# do not use import_from_xml to avoid autofixing url_name.
|
|
carddef = CardDef.import_from_xml_tree(tree, include_id=False)
|
|
try:
|
|
existing_carddef = CardDef.get_by_urlname(carddef.url_name)
|
|
except KeyError:
|
|
carddef.store(comment='Indus Initial Import')
|
|
except Exception as e:
|
|
raise Exception('failed to load existing carddef %r' % filename) from e
|
|
else:
|
|
# partial update
|
|
for attribute in (
|
|
'fields',
|
|
'digest_template',
|
|
'lateral_template',
|
|
'submission_lateral_template',
|
|
'geolocations',
|
|
):
|
|
if hasattr(carddef, attribute):
|
|
setattr(existing_carddef, attribute, getattr(carddef, attribute))
|
|
existing_carddef.store(comment='Indus Update')
|
|
|
|
def import_formdefs(self):
|
|
if not os.path.exists(os.path.join(self.directory, 'forms')):
|
|
return
|
|
for filename in os.listdir(os.path.join(self.directory, 'forms')):
|
|
logging.info('Importing formdef %s', filename)
|
|
fd = open(os.path.join(self.directory, 'forms', filename))
|
|
tree = ET.parse(fd)
|
|
# do not use import_from_xml to avoid autofixing url_name.
|
|
formdef = FormDef.import_from_xml_tree(tree, include_id=False)
|
|
try:
|
|
existing_formdef = FormDef.get_by_urlname(formdef.url_name)
|
|
except KeyError:
|
|
formdef.store(comment='Indus Initial Import')
|
|
except Exception as e:
|
|
raise Exception('failed to load existing formdef %r' % filename) from e
|
|
else:
|
|
# partial update
|
|
for attribute in (
|
|
'fields',
|
|
'digest_template',
|
|
'lateral_template',
|
|
'submission_lateral_template',
|
|
'user_support',
|
|
'geolocations',
|
|
):
|
|
if hasattr(formdef, attribute):
|
|
setattr(existing_formdef, attribute, getattr(formdef, attribute))
|
|
existing_formdef.store(comment='Indus Update')
|