400 lines
15 KiB
Python
400 lines
15 KiB
Python
# passerelle - uniform access to multiple data sources and services
|
|
# Copyright (C) 2021 Entr'ouvert
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify it
|
|
# under the terms of the GNU Affero General Public License as published
|
|
# by the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import csv
|
|
import datetime
|
|
import mimetypes
|
|
import os
|
|
|
|
from django.conf import settings
|
|
from django.core.exceptions import ValidationError
|
|
from django.core.files.base import ContentFile
|
|
from django.db import models, transaction
|
|
from django.db.models import Q
|
|
from django.http import HttpResponse
|
|
from django.utils.encoding import force_bytes, force_str, smart_text
|
|
from django.utils.timezone import now
|
|
from django.utils.translation import ugettext_lazy as _
|
|
|
|
from passerelle.base.models import BaseResource
|
|
from passerelle.utils.api import endpoint
|
|
from passerelle.utils.jsonresponse import APIError
|
|
|
|
PARITY_ALL = 0
|
|
PARITY_ODD = 1
|
|
PARITY_EVEN = 2
|
|
|
|
PARITY_CHOICES = (
|
|
(PARITY_ALL, _('all')),
|
|
(PARITY_ODD, _('odd')),
|
|
(PARITY_EVEN, _('even')),
|
|
)
|
|
|
|
CSV_TITLES = [
|
|
'street_id',
|
|
'street_name',
|
|
'parity',
|
|
'min_housenumber',
|
|
'max_housenumber',
|
|
'sector_id',
|
|
'sector_name',
|
|
]
|
|
|
|
MAX_HOUSENUMBER = 999_999
|
|
|
|
|
|
def upload_to(instance, filename):
|
|
return '%s/%s/%s' % (instance.get_connector_slug(), instance.slug, filename)
|
|
|
|
|
|
class SectorResource(BaseResource):
|
|
csv_file = models.FileField(
|
|
_('Sectorization file'),
|
|
upload_to=upload_to,
|
|
help_text=_('CSV file'),
|
|
)
|
|
titles_in_first_line = models.BooleanField(
|
|
_('First line defines column titles'),
|
|
default=True,
|
|
help_text=_('If not, column titles are: %s, …') % ','.join(CSV_TITLES),
|
|
)
|
|
|
|
category = _('Geographic information system')
|
|
hide_description_fields = ['csv_file']
|
|
|
|
def __str__(self):
|
|
return '%s [%s]' % (self.title, self.slug)
|
|
|
|
def daily(self):
|
|
super().daily()
|
|
self.clean_old_csv_files()
|
|
|
|
def clean_old_csv_files(self):
|
|
if not os.path.exists(self.csv_file.path):
|
|
return
|
|
base_dir = os.path.dirname(self.csv_file.path)
|
|
if os.path.dirname(self.csv_file.name) != os.path.join(self.get_connector_slug(), self.slug):
|
|
# path is not compliant with upload_to, do nothing
|
|
return
|
|
|
|
for filename in os.listdir(base_dir):
|
|
filepath = os.path.join(base_dir, filename)
|
|
if not os.path.isfile(filepath):
|
|
continue
|
|
if os.path.basename(self.csv_file.name) == filename:
|
|
# current file
|
|
continue
|
|
mtime = os.stat(filepath).st_mtime
|
|
if mtime > (now() + datetime.timedelta(days=-7)).timestamp():
|
|
# too young
|
|
continue
|
|
|
|
if getattr(settings, 'SECTOR_REMOVE_ON_CLEAN', False) is True:
|
|
# remove
|
|
os.unlink(filepath)
|
|
else:
|
|
# move file in unused-files dir
|
|
unused_dir = os.path.join(base_dir, 'unused-files')
|
|
os.makedirs(unused_dir, exist_ok=True)
|
|
os.rename(filepath, os.path.join(unused_dir, filename))
|
|
|
|
def clean(self, *args, **kwargs):
|
|
try:
|
|
self.import_csv(validate_only=True)
|
|
except Exception as e:
|
|
raise ValidationError(_('Invalid CSV file: %s') % e)
|
|
return super().clean(*args, **kwargs)
|
|
|
|
def save(self, *args, **kwargs):
|
|
import_csv = kwargs.pop('import_csv', True)
|
|
result = super().save(*args, **kwargs)
|
|
if import_csv:
|
|
self.import_csv()
|
|
return result
|
|
|
|
@transaction.atomic
|
|
def import_csv(self, validate_only=False):
|
|
try:
|
|
self.csv_file.seek(0)
|
|
content = force_bytes(self.csv_file.read())
|
|
content = force_str(content.decode('utf-8-sig', 'ignore').encode('utf-8')) # handle BOM
|
|
dialect = csv.Sniffer().sniff(content)
|
|
reader = csv.reader(content.splitlines(), dialect)
|
|
except Exception as e:
|
|
raise ValidationError(_('failed to read CSV (%s)') % e)
|
|
if self.titles_in_first_line:
|
|
first_line = next(reader)
|
|
titles = [name.strip().lower() for name in first_line]
|
|
if not set(CSV_TITLES).issubset(titles):
|
|
raise ValidationError(
|
|
_('missing column(s) in header: %s.') % ', '.join(set(CSV_TITLES) - set(titles))
|
|
)
|
|
else:
|
|
titles = CSV_TITLES
|
|
indexes = [titles.index(t) for t in titles if t]
|
|
captions = [titles[i] for i in indexes]
|
|
|
|
# now ready to import data, first delete all sectors
|
|
# sectorization will also be deleted (cascade)
|
|
# (will be cancelled by transaction on any import error)
|
|
if not validate_only:
|
|
self.sector_set.all().delete()
|
|
|
|
def get_cell(row, index):
|
|
try:
|
|
return row[index]
|
|
except IndexError:
|
|
return ''
|
|
|
|
sector_id = sector_name = None
|
|
sectors = {}
|
|
for row in reader:
|
|
if not row:
|
|
continue # do not consider empty lines
|
|
row = [smart_text(x).strip() for x in row]
|
|
row = {caption: get_cell(row, index) for caption, index in zip(captions, indexes)}
|
|
|
|
if row['sector_id']:
|
|
sector_id = row['sector_id']
|
|
sector_name = row['sector_name'] or row['sector_id']
|
|
if not sector_id:
|
|
raise ValidationError(_('missing sector_id, line %s') % reader.line_num)
|
|
if not row['street_id']:
|
|
raise ValidationError(_('missing street_id, line %s') % reader.line_num)
|
|
if not row['parity']:
|
|
parity = PARITY_ALL
|
|
elif row['parity'].lower()[0] in (str(PARITY_EVEN), 'e', 'p'): # p = pair (even, in french)
|
|
parity = PARITY_EVEN
|
|
elif row['parity'].lower()[0] in (str(PARITY_ODD), 'o', 'i'): # i = impair (odd, in french)
|
|
parity = PARITY_ODD
|
|
else:
|
|
parity = PARITY_ALL
|
|
try:
|
|
min_housenumber = int(row['min_housenumber'])
|
|
except (ValueError, TypeError):
|
|
min_housenumber = 0
|
|
try:
|
|
max_housenumber = int(row['max_housenumber'])
|
|
except (ValueError, TypeError):
|
|
max_housenumber = MAX_HOUSENUMBER
|
|
|
|
if not validate_only:
|
|
if sector_id not in sectors:
|
|
sectors[sector_id] = self.sector_set.create(
|
|
resource=self, slug=sector_id, title=sector_name
|
|
)
|
|
sectors[sector_id].sectorization_set.create(
|
|
sector=sectors[sector_id],
|
|
street_id=row['street_id'],
|
|
parity=parity,
|
|
min_housenumber=min_housenumber,
|
|
max_housenumber=max_housenumber,
|
|
street_name=row['street_name'],
|
|
)
|
|
|
|
@endpoint(
|
|
description=_('Update sectorization with a CSV file'),
|
|
display_category=_('Management'),
|
|
perm='can_access',
|
|
methods=['put'],
|
|
)
|
|
def update(self, request):
|
|
ext = mimetypes.guess_extension(request.content_type)
|
|
if not ext:
|
|
raise APIError(
|
|
"can't guess filename extension for '%s' content type" % request.content_type, http_status=400
|
|
)
|
|
name = self.csv_file.storage.get_available_name('api-uploaded-file' + ext)
|
|
self.csv_file = ContentFile(content=request.body, name=name)
|
|
try:
|
|
self.clean()
|
|
except ValidationError as e:
|
|
raise APIError(e, http_status=400)
|
|
self.save()
|
|
return {
|
|
'updated': self.csv_file.name,
|
|
'data': [
|
|
{
|
|
'id': sector.slug,
|
|
'text': sector.title,
|
|
}
|
|
for sector in self.sector_set.all()
|
|
],
|
|
}
|
|
|
|
@endpoint(
|
|
description=_('Get sectorization as a CSV file'),
|
|
display_category=_('Management'),
|
|
perm='can_access',
|
|
parameters={
|
|
'even': {
|
|
'description': _('Even numbers indicator (default: %s)') % PARITY_EVEN,
|
|
'example_value': 'P',
|
|
},
|
|
'odd': {
|
|
'description': _('Odd numbers indicator (default: %s)') % PARITY_ODD,
|
|
'example_value': 'I',
|
|
},
|
|
'mix': {
|
|
'description': _('Even or odd numbers indicator (default: %s)') % PARITY_ALL,
|
|
'example_value': '',
|
|
},
|
|
'limits': {'description': _('Show housenumber min/max (0/%s)') % MAX_HOUSENUMBER, 'type': 'bool'},
|
|
'repeat': {'description': _('Repeat sector id and name on all lines'), 'type': 'bool'},
|
|
},
|
|
)
|
|
def export(self, request, even=None, odd=None, mix=None, limits=False, repeat=False):
|
|
response = HttpResponse(content_type='text/csv')
|
|
date = now().strftime('%Y-%m-%d_%H:%M')
|
|
response['Content-Disposition'] = 'attachment; filename="sector-%s-%s.csv"' % (self.slug, date)
|
|
writer = csv.writer(response, delimiter=',', quotechar='"', quoting=csv.QUOTE_ALL)
|
|
if self.titles_in_first_line:
|
|
writer.writerow(CSV_TITLES)
|
|
parity = dict(PARITY_CHOICES)
|
|
if even is not None:
|
|
parity[PARITY_EVEN] = even
|
|
if odd is not None:
|
|
parity[PARITY_ODD] = odd
|
|
if mix is not None:
|
|
parity[PARITY_ALL] = mix
|
|
for sector in self.sector_set.all().order_by('slug'):
|
|
first = True
|
|
for sectorization in sector.sectorization_set.all().order_by('street_id'):
|
|
writer.writerow(
|
|
[
|
|
sectorization.street_id,
|
|
sectorization.street_name,
|
|
parity.get(sectorization.parity, mix),
|
|
sectorization.min_housenumber if (limits or sectorization.min_housenumber) else '',
|
|
sectorization.max_housenumber
|
|
if (limits or sectorization.max_housenumber < MAX_HOUSENUMBER)
|
|
else '',
|
|
sector.slug if (repeat or first) else '',
|
|
sector.title if (repeat or first) else '',
|
|
]
|
|
)
|
|
first = False
|
|
return response
|
|
|
|
@endpoint(
|
|
name='sectors',
|
|
description=_('List of Sectors'),
|
|
perm='can_access',
|
|
display_category=_('Data sources'),
|
|
parameters={
|
|
'id': {'description': _('Sector identifier (slug)')},
|
|
'q': {'description': _('Filter by Sector Title or Identifier')},
|
|
'street_id': {'description': _('Get sectors for this Street identifier')},
|
|
'house_number': {
|
|
'description': _('Get sectors by this House Number (requires a street_id)'),
|
|
'type': 'integer',
|
|
},
|
|
},
|
|
)
|
|
def sectors(self, request, q=None, id=None, street_id=None, house_number=None):
|
|
if house_number and not street_id:
|
|
raise APIError('house_number requires a street_id', http_status=400)
|
|
|
|
# search by street and house number
|
|
if street_id:
|
|
query = Sectorization.objects.filter(sector__resource=self, street_id=street_id)
|
|
if house_number is not None:
|
|
house_number = int(house_number)
|
|
query = query.filter(min_housenumber__lte=house_number, max_housenumber__gte=house_number)
|
|
parity = PARITY_ODD if house_number % 2 else PARITY_EVEN
|
|
query = query.filter(Q(parity=PARITY_ALL) | Q(parity=parity))
|
|
else:
|
|
query = query.filter(parity=PARITY_ALL, min_housenumber=0, max_housenumber=MAX_HOUSENUMBER)
|
|
return {
|
|
'data': [
|
|
{
|
|
'id': sectorization.sector.slug,
|
|
'text': sectorization.sector.title,
|
|
}
|
|
]
|
|
for sectorization in query.reverse()
|
|
}
|
|
|
|
# list of sectors
|
|
query = self.sector_set.all()
|
|
if id is not None:
|
|
query = query.filter(slug=id)
|
|
elif q is not None:
|
|
query = query.filter(Q(slug__icontains=q) | Q(title__icontains=q))
|
|
return {
|
|
'data': [
|
|
{
|
|
'id': sector.slug,
|
|
'text': sector.title,
|
|
}
|
|
for sector in query
|
|
]
|
|
}
|
|
|
|
|
|
class Sector(models.Model):
|
|
resource = models.ForeignKey(SectorResource, on_delete=models.CASCADE)
|
|
title = models.CharField(max_length=256, verbose_name=_('Title'))
|
|
slug = models.CharField(max_length=128, verbose_name=_('Identifier'))
|
|
|
|
class Meta:
|
|
ordering = ['resource', 'slug']
|
|
unique_together = ('resource', 'slug')
|
|
|
|
def __str__(self):
|
|
return '%s > %s [%s]' % (self.resource, self.title, self.slug)
|
|
|
|
|
|
class Sectorization(models.Model):
|
|
sector = models.ForeignKey(Sector, on_delete=models.CASCADE, verbose_name=_('Sector'))
|
|
street_id = models.CharField(max_length=64, verbose_name=_('Street Identifier'))
|
|
street_name = models.CharField(max_length=150, verbose_name=_('Street Name'), blank=True)
|
|
parity = models.SmallIntegerField(
|
|
choices=PARITY_CHOICES, default=PARITY_ALL, verbose_name=_('Parity of numbers')
|
|
)
|
|
min_housenumber = models.PositiveIntegerField(
|
|
default=0, verbose_name=_('Minimum house number (included)')
|
|
)
|
|
max_housenumber = models.PositiveIntegerField(
|
|
default=MAX_HOUSENUMBER, verbose_name=_('Maximum house number (included)')
|
|
)
|
|
|
|
class Meta:
|
|
ordering = ['street_id', 'min_housenumber', 'parity']
|
|
|
|
def __str__(self):
|
|
if self.street_name:
|
|
street = '%s (%s)' % (self.street_id, self.street_name)
|
|
else:
|
|
street = self.street_id
|
|
return '%s, parity:%s, min:%s, max:%s → %s' % (
|
|
street,
|
|
dict(PARITY_CHOICES).get(self.parity),
|
|
self.min_housenumber,
|
|
self.max_housenumber,
|
|
self.sector,
|
|
)
|
|
|
|
@property
|
|
def resource(self):
|
|
return self.sector.resource
|
|
|
|
def clean(self):
|
|
if not self.max_housenumber or self.max_housenumber > MAX_HOUSENUMBER:
|
|
self.max_housenumber = MAX_HOUSENUMBER
|
|
if self.min_housenumber > self.max_housenumber:
|
|
raise ValidationError(_('Minimal house number may not be lesser than maximal house number.'))
|