initial commit

This commit is contained in:
Josue Kouka 2017-06-27 16:05:33 +02:00
commit d0368ac0f5
3 changed files with 205 additions and 0 deletions

152
concerto_2_chrono_format.py Executable file
View File

@ -0,0 +1,152 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# vi:filetype=python
# chrono - agendas system
# Copyright (C) 2016-2017 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import re
import csv
import sys
import json
import datetime
import logging
from logging.config import dictConfig
from itertools import groupby
LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'handlers': {
'console': {
'level': 'DEBUG',
'class': 'logging.StreamHandler',
},
'syslog': {
'level': 'DEBUG',
'address': '/dev/log',
'class': 'logging.handlers.SysLogHandler',
'formatter': 'syslog',
},
},
'formatters': {
'syslog': {
'format': '%(levelname)s %(name)s.%(funcName)s: %(message)s',
},
},
'loggers': {
'': {
'handlers': ['console', 'syslog'],
'level': 'DEBUG',
'propagate': True,
}
}
}
dictConfig(LOGGING)
logger = logging.getLogger('hobo-loaders')
_slugify_strip_re = re.compile(r'[^\w\s-]')
_slugify_hyphenate_re = re.compile(r'[-\s]+')
DATETIME_FORMAT = '%Y-%m-%d %H:%M:%S'
def slugify(value):
import unicodedata
if not isinstance(value, unicode):
value = unicode(value, errors='ignore')
value = unicodedata.normalize('NFKD', value).encode('ascii', 'ignore')
value = unicode(_slugify_strip_re.sub('', value).strip().lower(), errors='ignore')
return _slugify_hyphenate_re.sub('-', value)
def csv_get_dialect(filename):
with open(filename, 'rb') as fd:
content = fd.read()
content = content.decode('utf-8-sig', 'replace').encode('utf-8')
dialect = csv.Sniffer().sniff(content)
return dialect
def csv_get_dict_data(filename, fieldnames=[], skip_header=True):
with open(filename, 'rb') as fd:
kwargs = {'fieldnames': fieldnames}
kwargs['dialect'] = csv_get_dialect(filename)
reader = csv.DictReader(fd, **kwargs)
if skip_header:
reader.next()
return list(reader)
return False
def json_write_data(json_data, output):
"""Write data into a json file
"""
with open(output, 'w') as f:
json.dump(json_data, f, indent=4, encoding='utf-8', sort_keys=True)
return True
return False
class ConcertoFormatter(object):
name = 'concerto'
output_file = 'concerto.json'
fieldnames = [
'ID_LIE', 'DAT_JOUR_TMP', 'HEU_DEBUT_TMP', 'HEU_FIN_TMP',
'NB_PLACESPOSS_TMP', 'NB_PLACESOCC_TMP', 'NB_PLACESRESFUT_TMP',
'NB_PLACESRES_TMP', 'NB_PLACESLIBRES_TMP', 'NB_DUREE_TMP',
'LIB_NOM_LIE', 'NB_PLACESRESFUTPOND_TMP']
def __init__(self, filename, **kwargs):
self.filename = filename
def format(self):
agendas = []
data = csv_get_dict_data(self.filename, fieldnames=self.fieldnames)
for nursery, events in groupby(data, lambda x: x['LIB_NOM_LIE']):
nursery_slug = slugify(nursery)
agenda = {'slug': nursery_slug, 'label': unicode(nursery, errors='replace'),
'events': [], 'kind': 'events'}
agenda['permissions'] = {'edit': None, 'view': None}
for event in events:
event_day = datetime.datetime.strptime(event['DAT_JOUR_TMP'], '%Y%m%d')
event_time = datetime.datetime.strptime(event['HEU_DEBUT_TMP'], '%H%M')
event_datetime = datetime.datetime.combine(event_day, event_time.time())
# XXX: value such as '-' OR '-76,24'
try:
event_places = int(event['NB_PLACESLIBRES_TMP'].split(',')[0])
except ValueError:
event_places = 0
if event_places < 0:
event_places = 0
agenda['events'].append({'start_datetime': event_datetime.strftime(DATETIME_FORMAT), 'places': event_places})
agendas.append(agenda)
return json_write_data({'agendas': agendas}, self.output_file)
if __name__ == "__main__":
if len(sys.argv) < 2:
logger.error('Invalid agrument number: concerto_2_chrono_format.py <file>')
sys.exit(1)
filename = sys.argv[1]
concerto = ConcertoFormatter(filename)
concerto.format()
sys.exit(0)

9
data.csv Normal file
View File

@ -0,0 +1,9 @@
ID_LIE;DAT_JOUR_TMP;HEU_DEBUT_TMP;HEU_FIN_TMP;NB_PLACESPOSS_TMP;NB_PLACESOCC_TMP;NB_PLACESRESFUT_TMP;NB_PLACESRES_TMP;NB_PLACESLIBRES_TMP;NB_DUREE_TMP;LIB_NOM_LIE;NB_PLACESRESFUTPOND_TMP
16;20170314;1500;1530;45;46;2;49;-3;4;Crèche Collective du BARON;-27,675
16;20170314;1530;1600;45;46;2;49;-3;4;Crèche Collective du BARON;-27,675
16;20170313;1600;1630;45;42,75;0;44,75;2,25;4;Crèche Collective du BARON;
16;20170313;1630;1700;45;42,75;0;44,75;2,25;4;Crèche Collective du BARON;
17;20170314;1000;1030;72;66,25;0;80,5;5,75;4;Crèche Collective des BLOSSIERES;
17;20170314;1030;1100;72;66,25;0;80,5;5,75;4;Crèche Collective des BLOSSIERES;
17;20170314;1300;1330;72;65;0;75;7;4;Crèche Collective des BLOSSIERES;
17;20170314;1330;1400;72;65;0;75;7;4;Crèche Collective des BLOSSIERES;
1 ID_LIE DAT_JOUR_TMP HEU_DEBUT_TMP HEU_FIN_TMP NB_PLACESPOSS_TMP NB_PLACESOCC_TMP NB_PLACESRESFUT_TMP NB_PLACESRES_TMP NB_PLACESLIBRES_TMP NB_DUREE_TMP LIB_NOM_LIE NB_PLACESRESFUTPOND_TMP
2 16 20170314 1500 1530 45 46 2 49 -3 4 Crèche Collective du BARON -27,675
3 16 20170314 1530 1600 45 46 2 49 -3 4 Crèche Collective du BARON -27,675
4 16 20170313 1600 1630 45 42,75 0 44,75 2,25 4 Crèche Collective du BARON
5 16 20170313 1630 1700 45 42,75 0 44,75 2,25 4 Crèche Collective du BARON
6 17 20170314 1000 1030 72 66,25 0 80,5 5,75 4 Crèche Collective des BLOSSIERES
7 17 20170314 1030 1100 72 66,25 0 80,5 5,75 4 Crèche Collective des BLOSSIERES
8 17 20170314 1300 1330 72 65 0 75 7 4 Crèche Collective des BLOSSIERES
9 17 20170314 1330 1400 72 65 0 75 7 4 Crèche Collective des BLOSSIERES

44
tests.py Normal file
View File

@ -0,0 +1,44 @@
import os
import json
from concerto_2_chrono_format import ConcertoFormatter
def test_formatting():
formatter = ConcertoFormatter('data.csv')
formatter.format()
with open('concerto.json', 'rb') as fp:
data = json.load(fp)
agendas = data['agendas']
assert len(agendas) == 2
for agenda in agendas:
if agenda['slug'] == 'crche-collective-du-baron':
assert agenda['kind'] == 'events'
assert agenda['permissions']['edit'] is None
assert agenda['permissions']['view'] is None
assert len(agenda['events']) == 4
for event in agenda['events']:
if event['places'] == 0:
assert event['start_datetime'] in ("2017-03-14 15:00:00",
"2017-03-14 15:30:00")
else:
assert event['start_datetime'] in ("2017-03-13 16:00:00",
"2017-03-13 16:30:00")
else:
assert agenda['kind'] == 'events'
assert agenda['permissions']['edit'] is None
assert agenda['permissions']['view'] is None
assert len(agenda['events']) == 4
for event in agenda['events']:
if event['places'] == 5:
assert event['start_datetime'] in ("2017-03-14 10:00:00",
"2017-03-14 10:30:00")
else:
assert event['start_datetime'] in ("2017-03-14 13:00:00",
"2017-03-14 13:30:00")
def teardown_module(module):
os.remove('concerto.json')