172 lines
6.7 KiB
Python
172 lines
6.7 KiB
Python
# passerelle-imio-liege-lisrue - passerelle connector to Lisrue webservice
|
|
# Copyright (C) 2016 Entr'ouvert
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify it
|
|
# under the terms of the GNU Affero General Public License as published
|
|
# by the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import unicodedata
|
|
|
|
import requests
|
|
from django.db import models
|
|
from django.utils.encoding import force_str
|
|
from django.utils.translation import ugettext_lazy as _
|
|
from passerelle.base.models import BaseResource
|
|
from passerelle.utils.api import endpoint
|
|
|
|
|
|
class ImioLiegeLisrue(BaseResource):
|
|
PROFILE_CHOICES = (
|
|
('LIEGE', 'Liege'),
|
|
('LIEGE_V2', 'Liege lisrue2'),
|
|
('NAMUR', 'Namur'),
|
|
)
|
|
service_url = models.CharField(
|
|
max_length=128,
|
|
blank=False,
|
|
verbose_name=_('Service URL'),
|
|
help_text=_('SIG Web Service URL (ex: https://e-services.liege.be:8443/)'),
|
|
)
|
|
profile = models.CharField(max_length=100, choices=PROFILE_CHOICES, default='Liege')
|
|
include_all_of_belgium = models.BooleanField(default=True, verbose_name=_('Include all of Belgium'))
|
|
street_with_postal_code = models.BooleanField(
|
|
default=False, verbose_name=_('Return street with postal code')
|
|
)
|
|
|
|
verify_cert = models.BooleanField(default=True, verbose_name=_('Check HTTPS Certificate validity'))
|
|
|
|
category = _('Geographic information system')
|
|
|
|
class Meta:
|
|
verbose_name = _('Liege Lisrue Service')
|
|
|
|
@classmethod
|
|
def get_verbose_name(cls):
|
|
return cls._meta.verbose_name
|
|
|
|
@classmethod
|
|
def get_icon_class(cls):
|
|
return 'gis'
|
|
|
|
@classmethod
|
|
def get_connector_slug(cls):
|
|
return 'imio-liege-lisrue'
|
|
|
|
@endpoint(perm='OPEN')
|
|
def voies(self, request, q=None, **kwargs):
|
|
url = self.service_url
|
|
if self.profile == 'LIEGE_V2':
|
|
if self.include_all_of_belgium:
|
|
url += 'jsonruebelgique/findByValue?indexName=RUE&startResult=0&maxResult=30&value='
|
|
else:
|
|
url += 'jsonlgrue12/'
|
|
# url += "jsonlgrue2com/" <== commissariat.
|
|
else:
|
|
if self.include_all_of_belgium:
|
|
# url += "jsonlisrue"
|
|
url += 'jsonruebelgique/findByValue?indexName=RUE&startResult=0&maxResult=30&value='
|
|
else:
|
|
# url += "jsonlisrue2"
|
|
url += 'jsonlgrue12/'
|
|
if q:
|
|
q = force_str(unicodedata.normalize('NFKD', request.GET['q']).encode('ascii', 'ignore'))
|
|
url += q.lower()
|
|
result = requests.get(url, headers={'Accept': 'application/json'}, verify=self.verify_cert).json()
|
|
if 'data' in result:
|
|
result['rues'] = result.pop('data')
|
|
if isinstance(result['rues'], list):
|
|
lisrues = result['rues']
|
|
elif isinstance(result['rues'], dict) and 'return' in result['rues']:
|
|
lisrues = [result['rues']['return']]
|
|
elif isinstance(result['rues'], dict) and 'rue' in result['rues']:
|
|
lisrues = [result['rues']['rue']]
|
|
else:
|
|
lisrues = []
|
|
|
|
streets = []
|
|
known_street_labels = {}
|
|
for item in lisrues:
|
|
if self.include_all_of_belgium:
|
|
street_label = item.get('libelle')
|
|
elif item.get('libelleMinuscule'):
|
|
street_label = '%s %s' % (
|
|
item.get('particuleMinuscule') or '',
|
|
item.get('libelleMinuscule'),
|
|
)
|
|
else:
|
|
continue
|
|
if street_label in known_street_labels:
|
|
continue
|
|
known_street_labels[street_label] = True
|
|
if self.street_with_postal_code:
|
|
street_label = u'{} - {}'.format(street_label, str(item.get('codePostal'))).strip()
|
|
else:
|
|
street_label = street_label.strip()
|
|
|
|
if self.profile == 'LIEGE_V2':
|
|
if self.include_all_of_belgium:
|
|
streets.append(
|
|
{
|
|
'id': item.get('codeRue') or item.get('codeStreet'),
|
|
'text': street_label,
|
|
}
|
|
)
|
|
elif (
|
|
item.get('statutVO') is not None and item.get('statutVO').get('code') == 1
|
|
) or 'jsonlgrue12' in url:
|
|
streets.append(
|
|
{
|
|
'id': item.get('codeRue') or item.get('codeStreet'),
|
|
'text': street_label,
|
|
'codeCommissariat': item.get('commissariat').get('codeCommissariat')
|
|
if item.get('commissariat')
|
|
else '',
|
|
'libelleCommissariat': item.get('commissariat').get('libelleCommissariat')
|
|
if item.get('commissariat')
|
|
else '',
|
|
}
|
|
)
|
|
else:
|
|
continue
|
|
else:
|
|
streets.append(
|
|
{
|
|
'id': item.get('codeRue') or item.get('codeStreet'),
|
|
'text': street_label,
|
|
}
|
|
)
|
|
return {'data': streets}
|
|
|
|
@endpoint(perm='OPEN')
|
|
def voies_namur(self, request, q=None, postCode=None, **kwargs):
|
|
if q is None:
|
|
return {'data': []}
|
|
else:
|
|
url = self.service_url
|
|
url = '{}?outSrid=31370'.format(url)
|
|
q = force_str(unicodedata.normalize('NFKD', request.GET['q']).encode('ascii', 'ignore'))
|
|
url = '{}&q={}'.format(url, q.lower())
|
|
|
|
result = requests.get(url, headers={'Accept': 'application/json'}, verify=self.verify_cert).json()
|
|
streets = []
|
|
for item in result:
|
|
if item.get('postCode') == postCode or postCode is None:
|
|
street_label = item.get('rueNom')
|
|
streets.append(
|
|
{
|
|
'id': item.get('rueCode'),
|
|
'text': street_label,
|
|
'postCode': item.get('postCode'),
|
|
}
|
|
)
|
|
return {'data': streets}
|