combo/combo/data/utils.py

257 lines
8.8 KiB
Python

# combo - content management system
# Copyright (C) 2017 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import json
import tarfile
from django.conf import settings
from django.contrib.auth.models import Group
from django.core.serializers.base import DeserializationError
from django.db import transaction
from django.utils.encoding import python_2_unicode_compatible
from django.utils.translation import ugettext_lazy as _
from combo.apps.assets.models import Asset
from combo.apps.assets.utils import add_tar_content, clean_assets_files, tar_assets_files, untar_assets_files
from .models import Page, extract_context_from_sub_slug
class MissingSubSlug(Exception):
def __init__(self, page):
self.page = page
class ImportSiteError(Exception):
pass
@python_2_unicode_compatible
class MissingGroups(ImportSiteError):
def __init__(self, names):
self.names = names
def __str__(self):
return _('Missing groups: %s') % ', '.join(self.names)
def export_site(pages=True, cartography=True, pwa=True, assets=True, payment=True):
'''Dump site objects to JSON-dumpable dictionnary'''
if 'combo.apps.lingo' in settings.INSTALLED_APPS:
from combo.apps.lingo.models import PaymentBackend, Regie
else:
payment = False
if 'combo.apps.maps' in settings.INSTALLED_APPS:
from combo.apps.maps.models import MapLayer
else:
cartography = False
if 'combo.apps.pwa' in settings.INSTALLED_APPS:
from combo.apps.pwa.models import PwaNavigationEntry, PwaSettings
else:
pwa = False
export = {}
if pages:
export['pages'] = Page.export_all_for_json()
if cartography:
export['map-layers'] = MapLayer.export_all_for_json()
if assets:
export['assets'] = Asset.export_all_for_json()
if pwa:
export['pwa'] = {
'settings': PwaSettings.export_for_json(),
'navigation': PwaNavigationEntry.export_all_for_json(),
}
if payment:
export['payment'] = {
'backends': PaymentBackend.export_all_for_json(),
'regies': Regie.export_all_for_json(),
}
return export
def import_site(data, if_empty=False, clean=False, request=None):
if isinstance(data, list):
# old export form with a list of pages, convert it to new dictionary
# format.
data = {'pages': data}
if 'combo.apps.lingo' in settings.INSTALLED_APPS:
from combo.apps.lingo.models import PaymentBackend, Regie
payment_support = True
else:
payment_support = False
if 'combo.apps.maps' in settings.INSTALLED_APPS:
from combo.apps.maps.models import MapLayer
cartography_support = True
else:
cartography_support = False
if 'combo.apps.pwa' in settings.INSTALLED_APPS:
from combo.apps.pwa.models import PwaNavigationEntry, PwaSettings
pwa_support = True
else:
pwa_support = False
if if_empty and (Page.objects.count() or (cartography_support and MapLayer.objects.count())):
return
# check groups used in access control are all available.
groups = set()
for page in data.get('pages') or []:
for group in page['fields']['groups']:
groups.add(group if isinstance(group, str) else group[0])
for cell in page['cells']:
for group in cell['fields']['groups']:
groups.add(group if isinstance(group, str) else group[0])
existing_groups = {x.name for x in Group.objects.filter(name__in=groups)}
missing_groups = groups - existing_groups
if missing_groups:
raise MissingGroups(names=sorted(x for x in missing_groups))
with transaction.atomic():
if clean:
if cartography_support:
MapLayer.objects.all().delete()
Asset.objects.all().delete()
Page.objects.all().delete()
if pwa_support:
PwaSettings.objects.all().delete()
PwaNavigationEntry.objects.all().delete()
if payment_support:
PaymentBackend.objects.all().delete()
Regie.objects.all().delete()
try:
if data.get('map-layers') and cartography_support:
MapLayer.load_serialized_objects(data.get('map-layers'))
Asset.load_serialized_objects(data.get('assets') or [])
pages = Page.load_serialized_pages(data.get('pages') or [], request=request)
if data.get('pwa') and pwa_support:
PwaSettings.load_serialized_settings(data['pwa'].get('settings'))
PwaNavigationEntry.load_serialized_objects(data['pwa'].get('navigation'))
if data.get('payment') and payment_support:
PaymentBackend.load_serialized_objects(data['payment'].get('backends'))
Regie.load_serialized_objects(data['payment'].get('regies'))
except DeserializationError as e:
message = str(e)
if not message.startswith('Page matching query does not exist.'):
raise ImportSiteError(message)
try:
page_slug = message.split("'['")[1].split("']'")[0]
except IndexError:
raise ImportSiteError(message)
raise ImportSiteError(_('Unknown page "%s".') % page_slug)
else:
return pages
def export_site_tar(fd, export_kwargs=None):
with tarfile.open(mode='w', fileobj=fd) as tar:
data = export_site(**(export_kwargs or {}))
del data['assets']
add_tar_content(tar, '_site.json', json.dumps(data, indent=2))
tar_assets_files(tar)
def import_site_tar(fd, if_empty=False, clean=False, overwrite=False, request=None):
with tarfile.open(mode='r', fileobj=fd) as tar:
try:
tarinfo = tar.getmember('_site.json')
except KeyError:
raise ImportSiteError(_('TAR file should provide _site.json file'))
if 'combo.apps.maps' in settings.INSTALLED_APPS:
from combo.apps.maps.models import MapLayer
cartography_support = True
else:
cartography_support = False
if if_empty and (Page.objects.count() or (cartography_support and MapLayer.objects.count())):
return
if clean:
clean_assets_files()
json_site = tar.extractfile(tarinfo).read()
data = json.loads(json_site.decode('utf-8'))
data.update(untar_assets_files(tar, overwrite=overwrite))
pages = import_site(data, if_empty=if_empty, clean=clean, request=request)
return pages
def get_page_from_url_parts(parts, request=None):
pages = {}
for page in Page.objects.filter(slug__in=parts):
if not page.slug in pages:
pages[page.slug] = []
pages[page.slug].append(page)
if not pages:
return
i = 0
hierarchy_ids = [None]
while i < len(parts):
slug_pages = pages.get(parts[i])
if slug_pages is None or len(slug_pages) == 0:
page = None
break
if len(slug_pages) == 1:
page = slug_pages[0]
else:
# multiple pages with same slugs
try:
page = [x for x in slug_pages if x.parent_id == hierarchy_ids[-1]][0]
except IndexError:
page = None
break
if page.parent_id != hierarchy_ids[-1]:
if i == 0:
# root page should be at root but maybe the page is a child of
# /index/, and as /index/ is silent the page would appear
# directly under /; this is not a suggested practice.
if page.parent.slug != 'index' and page.parent.parent_id is not None:
page = None
break
else:
page = None
break
if page.sub_slug:
if parts[i + 1 :] == []:
raise MissingSubSlug(page)
extra = extract_context_from_sub_slug(page.sub_slug, parts[i + 1])
if extra is None:
page = None
break
if request:
request.extra_context_data.update(extra)
parts = parts[: i + 1] + parts[i + 2 :] # skip variable component
i += 1
hierarchy_ids.append(page.id)
return page