This repository has been archived on 2023-02-21. You can view files and clone it, but cannot push or open issues or pull requests.
tabularfile/tabularfile/csv.py

145 lines
4.5 KiB
Python

# tabularfile.csv - simple ods reader and writer
# Copyright (C) 2020 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import csv
import contextlib
import datetime
import io
from .common import TabularFileError
# work around https://github.com/Ousret/charset_normalizer/issues/33
try:
import charset_normalizer.normalizer
import sys
charset_normalizer.normalizer.python_version_tuple = lambda: tuple(sys.version_info)[:3]
except ImportError:
pass
class Reader:
def __init__(self, fh, encoding=None, dialect=None, delimiter=None, quotechar=None, **kwargs):
offset = fh.tell()
header = fh.read(1024 * 64)
self.encoding = encoding
if not self.encoding or self.encoding == 'autodetect':
self.encoding = None
try:
import charamel
except ImportError:
pass
else:
self.encoding = charamel.Detector().detect(header) or 'utf-8'
if not self.encoding:
raise TabularFileError('encoding cannot be autodetected, please install charamel or charset_normalizer')
fh.seek(offset)
text_fh = io.TextIOWrapper(fh, encoding=self.encoding)
offset = text_fh.tell()
self._offset = offset
if dialect or delimiter or quotechar:
self.csv_kwargs = {
'dialect': dialect,
'delimiter': delimiter,
'quotechar': quotechar,
}
else:
sniffer = csv.Sniffer()
header = ''
for i in range(10):
header += text_fh.read(1024 * 5)
try:
dialect = sniffer.sniff(header)
self.csv_kwargs = {'dialect': dialect}
break
except csv.Error:
pass
else:
# dummy detection
line0 = header.splitlines()[0]
delimiter = ',' if line0.count(',') >= line0.count(';') else ';'
self.csv_kwargs = {'delimiter': delimiter}
text_fh.seek(offset)
self.text_fh = text_fh
def __iter__(self):
try:
for row in csv.reader(self.text_fh, **self.csv_kwargs):
yield row
except Exception as e:
raise TabularFileError('parsing error') from e
finally:
self.text_fh.seek(self._offset)
@property
def sheets(self):
return [0]
class Writer:
def __init__(self,
text_fh,
dialect=None,
delimiter=None,
quotechar=None,
date_format='%Y-%m-%d',
datetime_format='%Y-%m-%dT%H:%M:%S%z',
**kwargs):
if not dialect and not delimiter and not quotechar:
dialect = csv.excel
csv_kwargs = {}
if dialect:
csv_kwargs['dialect'] = dialect
if delimiter:
csv_kwargs['delimiter'] = delimiter
if quotechar:
csv_kwargs['quotechar'] = quotechar
self.csv_writer = csv.writer(text_fh, **csv_kwargs)
self.date_format = date_format
self.datetime_format = datetime_format
def writerow(self, row):
def helper(row):
for cell in row:
if isinstance(cell, datetime.datetime):
yield cell.strftime(self.datetime_format)
elif isinstance(cell, datetime.date):
yield cell.strftime(self.date_format)
else:
yield str(cell)
self.csv_writer.writerow(helper(row))
def writerows(self, rows):
for row in rows:
self.writerow(row)
@contextlib.contextmanager
def writer(fh, encoding='utf-8', **kwargs):
text_fh = io.TextIOWrapper(fh, encoding=encoding)
yield Writer(text_fh, **kwargs)
text_fh.flush()