passerelle-imio-liege-lisrue/passerelle_imio_liege_lisrue/models.py

172 lines
6.7 KiB
Python

# passerelle-imio-liege-lisrue - passerelle connector to Lisrue webservice
# Copyright (C) 2016 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import unicodedata
import requests
from django.db import models
from django.utils.encoding import force_str
from django.utils.translation import ugettext_lazy as _
from passerelle.base.models import BaseResource
from passerelle.utils.api import endpoint
class ImioLiegeLisrue(BaseResource):
PROFILE_CHOICES = (
('LIEGE', 'Liege'),
('LIEGE_V2', 'Liege lisrue2'),
('NAMUR', 'Namur'),
)
service_url = models.CharField(
max_length=128,
blank=False,
verbose_name=_('Service URL'),
help_text=_('SIG Web Service URL (ex: https://e-services.liege.be:8443/)'),
)
profile = models.CharField(max_length=100, choices=PROFILE_CHOICES, default='Liege')
include_all_of_belgium = models.BooleanField(default=True, verbose_name=_('Include all of Belgium'))
street_with_postal_code = models.BooleanField(
default=False, verbose_name=_('Return street with postal code')
)
verify_cert = models.BooleanField(default=True, verbose_name=_('Check HTTPS Certificate validity'))
category = _('Geographic information system')
class Meta:
verbose_name = _('Liege Lisrue Service')
@classmethod
def get_verbose_name(cls):
return cls._meta.verbose_name
@classmethod
def get_icon_class(cls):
return 'gis'
@classmethod
def get_connector_slug(cls):
return 'imio-liege-lisrue'
@endpoint(perm='OPEN')
def voies(self, request, q=None, **kwargs):
url = self.service_url
if self.profile == 'LIEGE_V2':
if self.include_all_of_belgium:
url += 'jsonruebelgique/findByValue?indexName=RUE&startResult=0&maxResult=30&value='
else:
url += 'jsonlgrue12/'
# url += "jsonlgrue2com/" <== commissariat.
else:
if self.include_all_of_belgium:
# url += "jsonlisrue"
url += 'jsonruebelgique/findByValue?indexName=RUE&startResult=0&maxResult=30&value='
else:
# url += "jsonlisrue2"
url += 'jsonlgrue12/'
if q:
q = force_str(unicodedata.normalize('NFKD', request.GET['q']).encode('ascii', 'ignore'))
url += q.lower()
result = requests.get(url, headers={'Accept': 'application/json'}, verify=self.verify_cert).json()
if 'data' in result:
result['rues'] = result.pop('data')
if isinstance(result['rues'], list):
lisrues = result['rues']
elif isinstance(result['rues'], dict) and 'return' in result['rues']:
lisrues = [result['rues']['return']]
elif isinstance(result['rues'], dict) and 'rue' in result['rues']:
lisrues = [result['rues']['rue']]
else:
lisrues = []
streets = []
known_street_labels = {}
for item in lisrues:
if self.include_all_of_belgium:
street_label = item.get('libelle')
elif item.get('libelleMinuscule'):
street_label = '%s %s' % (
item.get('particuleMinuscule') or '',
item.get('libelleMinuscule'),
)
else:
continue
if street_label in known_street_labels:
continue
known_street_labels[street_label] = True
if self.street_with_postal_code:
street_label = u'{} - {}'.format(street_label, str(item.get('codePostal'))).strip()
else:
street_label = street_label.strip()
if self.profile == 'LIEGE_V2':
if self.include_all_of_belgium:
streets.append(
{
'id': item.get('codeRue') or item.get('codeStreet'),
'text': street_label,
}
)
elif (
item.get('statutVO') is not None and item.get('statutVO').get('code') == 1
) or 'jsonlgrue12' in url:
streets.append(
{
'id': item.get('codeRue') or item.get('codeStreet'),
'text': street_label,
'codeCommissariat': item.get('commissariat').get('codeCommissariat')
if item.get('commissariat')
else '',
'libelleCommissariat': item.get('commissariat').get('libelleCommissariat')
if item.get('commissariat')
else '',
}
)
else:
continue
else:
streets.append(
{
'id': item.get('codeRue') or item.get('codeStreet'),
'text': street_label,
}
)
return {'data': streets}
@endpoint(perm='OPEN')
def voies_namur(self, request, q=None, postCode=None, **kwargs):
if q is None:
return {'data': []}
else:
url = self.service_url
url = '{}?outSrid=31370'.format(url)
q = force_str(unicodedata.normalize('NFKD', request.GET['q']).encode('ascii', 'ignore'))
url = '{}&q={}'.format(url, q.lower())
result = requests.get(url, headers={'Accept': 'application/json'}, verify=self.verify_cert).json()
streets = []
for item in result:
if item.get('postCode') == postCode or postCode is None:
street_label = item.get('rueNom')
streets.append(
{
'id': item.get('rueCode'),
'text': street_label,
'postCode': item.get('postCode'),
}
)
return {'data': streets}