Merge pull request #52 from pyexcel/dev

Release 0.5.9
This commit is contained in:
jaska 2018-08-23 20:25:25 +01:00 committed by GitHub
commit 317e170118
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
32 changed files with 653 additions and 701 deletions

View File

@ -1,10 +1,19 @@
Change log Change log
================================================================================ ================================================================================
0.5.9 - 23.08.2018
--------------------------------------------------------------------------------
added
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
#. `pyexcel#148 <https://github.com/pyexcel/pyexcel/issues/148>`_, support
force_file_type
0.5.8 - 16.08.2018 0.5.8 - 16.08.2018
-------------------------------------------------------------------------------- --------------------------------------------------------------------------------
fixed added
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
#. `#49 <https://github.com/pyexcel/pyexcel-io/issues/49>`_, support additional #. `#49 <https://github.com/pyexcel/pyexcel-io/issues/49>`_, support additional

View File

@ -2,7 +2,13 @@ name: pyexcel-io
organisation: pyexcel organisation: pyexcel
releases: releases:
- changes: - changes:
- action: fixed - action: added
details:
- '`pyexcel#148`, support force_file_type'
date: 23.08.2018
version: 0.5.9
- changes:
- action: added
details: details:
- '`#49`, support additional options when detecting float values in csv format. default_float_nan, ignore_nan_text' - '`#49`, support additional options when detecting float values in csv format. default_float_nan, ignore_nan_text'
date: 16.08.2018 date: 16.08.2018

View File

@ -29,9 +29,9 @@ copyright = u'2015-2018 Onni Software Ltd.'
author = u'C.W.' author = u'C.W.'
# The short X.Y version # The short X.Y version
version = u'0.5.8' version = u'0.5.9'
# The full version, including alpha/beta/rc tags # The full version, including alpha/beta/rc tags
release = u'0.5.8' release = u'0.5.9'
# -- General configuration --------------------------------------------------- # -- General configuration ---------------------------------------------------

View File

@ -1,9 +1,9 @@
overrides: "pyexcel.yaml" overrides: "pyexcel.yaml"
name: "pyexcel-io" name: "pyexcel-io"
nick_name: io nick_name: io
version: 0.5.8 version: 0.5.9
current_version: 0.5.8 current_version: 0.5.9
release: 0.5.8 release: 0.5.9
dependencies: dependencies:
- ordereddict;python_version<"2.7" - ordereddict;python_version<"2.7"
- lml==0.0.1 - lml==0.0.1

View File

@ -18,7 +18,9 @@ import pyexcel_io.plugins as plugins
BLACK_LIST = [__name__, "pyexcel_webio", "pyexcel_text"] BLACK_LIST = [__name__, "pyexcel_webio", "pyexcel_text"]
WHITE_LIST = [ WHITE_LIST = [
"pyexcel_io.readers", "pyexcel_io.writers", "pyexcel_io.database" "pyexcel_io.readers",
"pyexcel_io.writers",
"pyexcel_io.database",
] ]
PREFIX = "pyexcel_" PREFIX = "pyexcel_"

View File

@ -33,7 +33,6 @@ try:
except ImportError: except ImportError:
class NullHandler(logging.Handler): class NullHandler(logging.Handler):
def emit(self, record): def emit(self, record):
pass pass
@ -46,7 +45,6 @@ if PY2:
irange = xrange irange = xrange
class Iterator(object): class Iterator(object):
def next(self): def next(self):
return type(self).__next__(self) return type(self).__next__(self)

View File

@ -16,6 +16,7 @@ class RWInterface(object):
""" """
The common methods for book reader and writer The common methods for book reader and writer
""" """
stream_type = None stream_type = None
def __init__(self): def __init__(self):

View File

@ -16,11 +16,21 @@ MESSAGE_INVALID_PARAMETERS = "Invalid parameters"
MESSAGE_ERROR_02 = "No content, file name. Nothing is given" MESSAGE_ERROR_02 = "No content, file name. Nothing is given"
MESSAGE_ERROR_03 = "cannot handle unknown content" MESSAGE_ERROR_03 = "cannot handle unknown content"
MESSAGE_WRONG_IO_INSTANCE = "Wrong io instance is passed for your file format." MESSAGE_WRONG_IO_INSTANCE = "Wrong io instance is passed for your file format."
MESSAGE_CANNOT_WRITE_STREAM_FORMATTER = "Cannot write content of file type %s to stream" MESSAGE_CANNOT_WRITE_STREAM_FORMATTER = (
MESSAGE_CANNOT_READ_STREAM_FORMATTER = "Cannot read content of file type %s from stream" "Cannot write content of file type %s to stream"
MESSAGE_CANNOT_WRITE_FILE_TYPE_FORMATTER = "Cannot write content of file type %s to file %s" )
MESSAGE_CANNOT_READ_FILE_TYPE_FORMATTER = "Cannot read content of file type %s from file %s" MESSAGE_CANNOT_READ_STREAM_FORMATTER = (
MESSAGE_LOADING_FORMATTER = "The plugin for file type %s is not installed. Please install %s" "Cannot read content of file type %s from stream"
)
MESSAGE_CANNOT_WRITE_FILE_TYPE_FORMATTER = (
"Cannot write content of file type %s to file %s"
)
MESSAGE_CANNOT_READ_FILE_TYPE_FORMATTER = (
"Cannot read content of file type %s from file %s"
)
MESSAGE_LOADING_FORMATTER = (
"The plugin for file type %s is not installed. Please install %s"
)
MESSAGE_EMPTY_ARRAY = "One empty row is found" MESSAGE_EMPTY_ARRAY = "One empty row is found"
MESSAGE_IGNORE_ROW = "One row is ignored" MESSAGE_IGNORE_ROW = "One row is ignored"
MESSAGE_DB_EXCEPTION = """ MESSAGE_DB_EXCEPTION = """
@ -47,5 +57,5 @@ STOP_ITERATION = 1
DEFAULT_MULTI_CSV_SEPARATOR = "__" DEFAULT_MULTI_CSV_SEPARATOR = "__"
SEPARATOR_FORMATTER = "---%s---" % DEFAULT_NAME + "%s" SEPARATOR_FORMATTER = "---%s---" % DEFAULT_NAME + "%s"
SEPARATOR_MATCHER = "---%s:(.*)---" % DEFAULT_NAME SEPARATOR_MATCHER = "---%s:(.*)---" % DEFAULT_NAME
DEFAULT_CSV_STREAM_FILE_FORMATTER = ("---%s:" % DEFAULT_NAME + "%s---%s") DEFAULT_CSV_STREAM_FILE_FORMATTER = "---%s:" % DEFAULT_NAME + "%s---%s"
DEFAULT_CSV_NEWLINE = "\r\n" DEFAULT_CSV_NEWLINE = "\r\n"

View File

@ -81,8 +81,7 @@ class DjangoBookWriter(BookWriter):
) )
else: else:
raise Exception( raise Exception(
"Sheet: %s does not match any given models." "Sheet: %s does not match any given models." % sheet_name
% sheet_name
+ "Please be aware of case sensitivity." + "Please be aware of case sensitivity."
) )

View File

@ -18,6 +18,7 @@ class PyexcelSQLSkipRowException(Exception):
Raised this exception to skipping a row Raised this exception to skipping a row
while data import while data import
""" """
pass pass
@ -93,8 +94,7 @@ class SQLBookWriter(BookWriter):
) )
else: else:
raise Exception( raise Exception(
"Sheet: %s does not match any given tables." "Sheet: %s does not match any given tables." % sheet_name
% sheet_name
+ "Please be aware of case sensitivity." + "Please be aware of case sensitivity."
) )

View File

@ -11,14 +11,17 @@
class NoSupportingPluginFound(Exception): class NoSupportingPluginFound(Exception):
"""raised when an known file extension is seen""" """raised when an known file extension is seen"""
pass pass
class SupportingPluginAvailableButNotInstalled(Exception): class SupportingPluginAvailableButNotInstalled(Exception):
"""raised when a known plugin is not installed""" """raised when a known plugin is not installed"""
pass pass
class UpgradePlugin(Exception): class UpgradePlugin(Exception):
"""raised when a known plugin is not compatible""" """raised when a known plugin is not compatible"""
pass pass

View File

@ -23,6 +23,8 @@ def iget_data(afile, file_type=None, **keywords):
:param sheet_index: the index of the sheet to be loaded :param sheet_index: the index of the sheet to be loaded
:param sheets: a list of sheet to be loaded :param sheets: a list of sheet to be loaded
:param file_type: used only when filename is not a physial file name :param file_type: used only when filename is not a physial file name
:param force_file_type: used only when filename refers to a physical file
and it is intended to open it as forced file type.
:param streaming: toggles the type of returned data. The values of the :param streaming: toggles the type of returned data. The values of the
returned dictionary remain as generator if it is set returned dictionary remain as generator if it is set
to True. Default is False. to True. Default is False.
@ -147,6 +149,7 @@ def load_data(
file_content=None, file_content=None,
file_stream=None, file_stream=None,
file_type=None, file_type=None,
force_file_type=None,
sheet_name=None, sheet_name=None,
sheet_index=None, sheet_index=None,
sheets=None, sheets=None,
@ -158,6 +161,8 @@ def load_data(
:param filename: actual file name, a file stream or actual content :param filename: actual file name, a file stream or actual content
:param file_type: used only when filename is not a physial file name :param file_type: used only when filename is not a physial file name
:param force_file_type: used only when filename refers to a physical file
and it is intended to open it as forced file type.
:param sheet_name: the name of the sheet to be loaded :param sheet_name: the name of the sheet to be loaded
:param sheet_index: the index of the sheet to be loaded :param sheet_index: the index of the sheet to be loaded
:param keywords: any other parameters :param keywords: any other parameters
@ -169,10 +174,13 @@ def load_data(
raise IOError(constants.MESSAGE_ERROR_02) raise IOError(constants.MESSAGE_ERROR_02)
if file_type is None: if file_type is None:
try: if force_file_type:
file_type = file_name.split(".")[-1] file_type = force_file_type
except AttributeError: else:
raise Exception("file_name should be a string type") try:
file_type = file_name.split(".")[-1]
except AttributeError:
raise Exception("file_name should be a string type")
reader = READERS.get_a_plugin(file_type, library) reader = READERS.get_a_plugin(file_type, library)
if file_name: if file_name:

View File

@ -98,7 +98,9 @@ class IOManager(PluginManager):
message = "Please install " message = "Please install "
if len(plugins) > 1: if len(plugins) > 1:
message += ERROR_MESSAGE_FORMATTER % ( message += ERROR_MESSAGE_FORMATTER % (
self.action, file_type, ",".join(plugins) self.action,
file_type,
",".join(plugins),
) )
else: else:
message += plugins[0] message += plugins[0]

View File

@ -84,7 +84,7 @@ class CSVMemoryMapIterator(compact.Iterator):
if bom_header == BOM_BIG_ENDIAN: if bom_header == BOM_BIG_ENDIAN:
self.__endian = BIG_ENDIAN self.__endian = BIG_ENDIAN
elif self.__endian == LITTLE_ENDIAN: elif self.__endian == LITTLE_ENDIAN:
line = line[self.__zeros_left_in_2_row:] line = line[self.__zeros_left_in_2_row :] # flake8: noqa
if self.__endian == LITTLE_ENDIAN: if self.__endian == LITTLE_ENDIAN:
line = line.rstrip() line = line.rstrip()
line = line.decode(self.__encoding) line = line.decode(self.__encoding)
@ -168,14 +168,14 @@ class CSVSheetReader(SheetReader):
ret = service.detect_int_value(csv_cell_text, self.__pep_0515_off) ret = service.detect_int_value(csv_cell_text, self.__pep_0515_off)
if ret is None and self.__auto_detect_float: if ret is None and self.__auto_detect_float:
ret = service.detect_float_value( ret = service.detect_float_value(
csv_cell_text, self.__pep_0515_off, csv_cell_text,
self.__pep_0515_off,
ignore_nan_text=self.__ignore_nan_text, ignore_nan_text=self.__ignore_nan_text,
default_float_nan=self.__default_float_nan default_float_nan=self.__default_float_nan,
) )
shall_we_ignore_the_conversion = ( shall_we_ignore_the_conversion = (
(ret in [float("inf"), float("-inf")]) ret in [float("inf"), float("-inf")]
and self.__ignore_infinity ) and self.__ignore_infinity
)
if shall_we_ignore_the_conversion: if shall_we_ignore_the_conversion:
ret = None ret = None
if ret is None and self.__auto_detect_datetime: if ret is None and self.__auto_detect_datetime:

View File

@ -40,9 +40,8 @@ def detect_date_value(cell_text):
def detect_float_value( def detect_float_value(
cell_text, pep_0515_off=True, cell_text, pep_0515_off=True, ignore_nan_text=False, default_float_nan=None
ignore_nan_text=False, ):
default_float_nan=None):
should_we_skip_it = ( should_we_skip_it = (
cell_text.startswith("0") and cell_text.startswith("0.") is False cell_text.startswith("0") and cell_text.startswith("0.") is False
) )

View File

@ -11,7 +11,7 @@ PY26 = PY2 and sys.version_info[1] < 7
NAME = 'pyexcel-io' NAME = 'pyexcel-io'
AUTHOR = 'C.W.' AUTHOR = 'C.W.'
VERSION = '0.5.8' VERSION = '0.5.9'
EMAIL = 'wangc_2011@hotmail.com' EMAIL = 'wangc_2011@hotmail.com'
LICENSE = 'New BSD' LICENSE = 'New BSD'
DESCRIPTION = ( DESCRIPTION = (
@ -19,7 +19,7 @@ DESCRIPTION = (
'format and to/from databases' 'format and to/from databases'
) )
URL = 'https://github.com/pyexcel/pyexcel-io' URL = 'https://github.com/pyexcel/pyexcel-io'
DOWNLOAD_URL = '%s/archive/0.5.8.tar.gz' % URL DOWNLOAD_URL = '%s/archive/0.5.9.tar.gz' % URL
FILES = ['README.rst', 'CHANGELOG.rst'] FILES = ['README.rst', 'CHANGELOG.rst']
KEYWORDS = [ KEYWORDS = [
'API', 'API',
@ -63,8 +63,8 @@ EXTRAS_REQUIRE = {
# You do not need to read beyond this line # You do not need to read beyond this line
PUBLISH_COMMAND = '{0} setup.py sdist bdist_wheel upload -r pypi'.format( PUBLISH_COMMAND = '{0} setup.py sdist bdist_wheel upload -r pypi'.format(
sys.executable) sys.executable)
GS_COMMAND = ('gs pyexcel-io v0.5.8 ' + GS_COMMAND = ('gs pyexcel-io v0.5.9 ' +
"Find 0.5.8 in changelog for more details") "Find 0.5.9 in changelog for more details")
NO_GS_MESSAGE = ('Automatic github release is disabled. ' + NO_GS_MESSAGE = ('Automatic github release is disabled. ' +
'Please install gease to enable it.') 'Please install gease to enable it.')
UPLOAD_FAILED_MSG = ( UPLOAD_FAILED_MSG = (

View File

@ -1,2 +1,2 @@
pip freeze pip freeze
nosetests --with-coverage --cover-package pyexcel_io --cover-package tests --with-doctest --doctest-extension=.rst README.rst docs/source pyexcel_io && flake8 . --exclude=.moban.d,docs --builtins=unicode,xrange,long nosetests --with-coverage --cover-package pyexcel_io --cover-package tests tests --with-doctest --doctest-extension=.rst README.rst docs/source pyexcel_io && flake8 . --exclude=.moban.d,docs --builtins=unicode,xrange,long

View File

@ -1,2 +1,2 @@
pip freeze pip freeze
nosetests --with-coverage --cover-package pyexcel_io --cover-package tests --with-doctest --doctest-extension=.rst README.rst docs/source pyexcel_io && flake8 . --exclude=.moban.d,docs --builtins=unicode,xrange,long nosetests --with-coverage --cover-package pyexcel_io --cover-package tests tests --with-doctest --doctest-extension=.rst README.rst docs/source pyexcel_io && flake8 . --exclude=.moban.d,docs --builtins=unicode,xrange,long

1
tests/fixtures/force_file_type.txt vendored Normal file
View File

@ -0,0 +1 @@
1,2,3

View File

@ -1,6 +1,4 @@
from pyexcel_io.sheet import ( from pyexcel_io.sheet import SheetReader, SheetWriter, NamedContent
SheetReader, SheetWriter, NamedContent
)
from pyexcel_io.book import BookWriter from pyexcel_io.book import BookWriter
from pyexcel_io.utils import is_empty_array from pyexcel_io.utils import is_empty_array
from nose.tools import raises from nose.tools import raises
@ -28,7 +26,6 @@ class ArrayWriter(SheetWriter):
class TestSheetReader: class TestSheetReader:
@raises(Exception) @raises(Exception)
def test_abstractness(self): def test_abstractness(self):
reader = SheetReader("test") reader = SheetReader("test")
@ -54,13 +51,13 @@ class TestSheetReader:
def to_array(self): def to_array(self):
pass pass
b = B(name) b = B(name)
b.to_array() b.to_array()
assert b.name == name assert b.name == name
class TestSheetWriter: class TestSheetWriter:
@raises(NotImplementedError) @raises(NotImplementedError)
def test_abstractness(self): def test_abstractness(self):
writer = SheetWriter("te", "st", "abstract") writer = SheetWriter("te", "st", "abstract")
@ -71,16 +68,12 @@ class TestSheetWriter:
def write_row(self, row): def write_row(self, row):
pass pass
d = D('t', 'e', 's') d = D("t", "e", "s")
d.write_row([11, 11]) d.write_row([11, 11])
def test_writer(self): def test_writer(self):
native_sheet = NamedContent("test", []) native_sheet = NamedContent("test", [])
content = [ content = [[1, 2], [3, 4], [5, 6]]
[1, 2],
[3, 4],
[5, 6]
]
writer = ArrayWriter(None, native_sheet, "test") writer = ArrayWriter(None, native_sheet, "test")
writer.write_row(content[0]) writer.write_row(content[0])
writer.write_array(content[1:]) writer.write_array(content[1:])
@ -88,11 +81,7 @@ class TestSheetWriter:
def test_writer2(self): def test_writer2(self):
native_sheet = NamedContent("test", []) native_sheet = NamedContent("test", [])
content = [ content = [[1, 2], [3, 4], [5, 6]]
[1, 2],
[3, 4],
[5, 6]
]
writer = ArrayWriter(None, native_sheet, None) writer = ArrayWriter(None, native_sheet, None)
writer.write_row(content[0]) writer.write_row(content[0])
writer.write_array(content[1:]) writer.write_array(content[1:])

View File

@ -10,12 +10,9 @@ from pyexcel_io.sheet import NamedContent
from pyexcel_io.readers.csvr import ( from pyexcel_io.readers.csvr import (
CSVSheetReader, CSVSheetReader,
CSVFileReader, CSVFileReader,
CSVinMemoryReader CSVinMemoryReader,
)
from pyexcel_io.writers.csvw import (
CSVFileWriter,
CSVMemoryWriter
) )
from pyexcel_io.writers.csvw import CSVFileWriter, CSVMemoryWriter
from pyexcel_io._compact import BytesIO, PY2, StringIO from pyexcel_io._compact import BytesIO, PY2, StringIO
@ -23,17 +20,9 @@ class TestReaders(TestCase):
def setUp(self): def setUp(self):
self.file_type = "csv" self.file_type = "csv"
self.test_file = "csv_book." + self.file_type self.test_file = "csv_book." + self.file_type
self.data = [ self.data = [["1", "2", "3"], ["4", "5", "6"], ["7", "8", "9"]]
["1", "2", "3"], self.expected_data = [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
["4", "5", "6"], with open(self.test_file, "w") as f:
["7", "8", "9"]
]
self.expected_data = [
[1, 2, 3],
[4, 5, 6],
[7, 8, 9]
]
with open(self.test_file, 'w') as f:
for row in self.data: for row in self.data:
f.write(",".join(row) + "\n") f.write(",".join(row) + "\n")
@ -49,7 +38,7 @@ class TestReaders(TestCase):
def test_sheet_memory_reader(self): def test_sheet_memory_reader(self):
io = manager.get_io(self.file_type) io = manager.get_io(self.file_type)
with open(self.test_file, 'r') as f: with open(self.test_file, "r") as f:
io.write(f.read()) io.write(f.read())
io.seek(0) io.seek(0)
r = CSVinMemoryReader(NamedContent(self.file_type, io)) r = CSVinMemoryReader(NamedContent(self.file_type, io))
@ -64,25 +53,23 @@ class TestWriter(TestCase):
def setUp(self): def setUp(self):
self.file_type = "csv" self.file_type = "csv"
self.test_file = "csv_book." + self.file_type self.test_file = "csv_book." + self.file_type
self.data = [ self.data = [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
[1, 2, 3], self.result = dedent(
[4, 5, 6], """
[7, 8, 9]
]
self.result = dedent("""
1,2,3 1,2,3
4,5,6 4,5,6
7,8,9 7,8,9
""").strip('\n') """
).strip("\n")
def test_sheet_writer(self): def test_sheet_writer(self):
w = CSVFileWriter(self.test_file, None) w = CSVFileWriter(self.test_file, None)
for row in self.data: for row in self.data:
w.write_row(row) w.write_row(row)
w.close() w.close()
with open(self.test_file, 'r') as f: with open(self.test_file, "r") as f:
content = f.read().replace('\r', '') content = f.read().replace("\r", "")
self.assertEqual(content.strip('\n'), self.result) self.assertEqual(content.strip("\n"), self.result)
def tearDown(self): def tearDown(self):
os.unlink(self.test_file) os.unlink(self.test_file)
@ -92,16 +79,14 @@ class TestMemoryWriter(TestCase):
def setUp(self): def setUp(self):
self.file_type = "csv" self.file_type = "csv"
self.test_file = "csv_book." + self.file_type self.test_file = "csv_book." + self.file_type
self.data = [ self.data = [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
[1, 2, 3], self.result = dedent(
[4, 5, 6], """
[7, 8, 9]
]
self.result = dedent("""
1,2,3 1,2,3
4,5,6 4,5,6
7,8,9 7,8,9
""").strip('\n') """
).strip("\n")
def test_sheet_writer_to_memory(self): def test_sheet_writer_to_memory(self):
io = manager.get_io(self.file_type) io = manager.get_io(self.file_type)
@ -109,31 +94,23 @@ class TestMemoryWriter(TestCase):
for row in self.data: for row in self.data:
w.write_row(row) w.write_row(row)
w.close() w.close()
content = io.getvalue().replace('\r', '') content = io.getvalue().replace("\r", "")
self.assertEqual(content.strip('\n'), self.result) self.assertEqual(content.strip("\n"), self.result)
class TestNonUniformCSV(TestCase): class TestNonUniformCSV(TestCase):
def setUp(self): def setUp(self):
self.file_type = "csv" self.file_type = "csv"
self.test_file = "csv_book." + self.file_type self.test_file = "csv_book." + self.file_type
self.data = [ self.data = [["1"], ["4", "5", "6", "", ""], ["", "7"]]
["1"], with open(self.test_file, "w") as f:
["4", "5", "6", "", ""],
["", "7"]
]
with open(self.test_file, 'w') as f:
for row in self.data: for row in self.data:
f.write(",".join(row) + "\n") f.write(",".join(row) + "\n")
def test_sheet_file_reader(self): def test_sheet_file_reader(self):
r = CSVFileReader(NamedContent(self.file_type, self.test_file)) r = CSVFileReader(NamedContent(self.file_type, self.test_file))
result = list(r.to_array()) result = list(r.to_array())
self.assertEqual(result, [ self.assertEqual(result, [[1], [4, 5, 6], ["", 7]])
[1],
[4, 5, 6],
["", 7]
])
def tearDown(self): def tearDown(self):
os.unlink(self.test_file) os.unlink(self.test_file)
@ -141,53 +118,55 @@ class TestNonUniformCSV(TestCase):
def test_utf16_decoding(): def test_utf16_decoding():
test_file = os.path.join("tests", "fixtures", "csv-encoding-utf16.csv") test_file = os.path.join("tests", "fixtures", "csv-encoding-utf16.csv")
reader = CSVFileReader( reader = CSVFileReader(NamedContent("csv", test_file), encoding="utf-16")
NamedContent('csv', test_file),
encoding="utf-16")
content = list(reader.to_array()) content = list(reader.to_array())
if PY2: if PY2:
content[0] = [s.encode('utf-8') for s in content[0]] content[0] = [s.encode("utf-8") for s in content[0]]
expected = [['Äkkilähdöt', 'Matkakirjoituksia', 'Matkatoimistot']] expected = [["Äkkilähdöt", "Matkakirjoituksia", "Matkatoimistot"]]
eq_(content, expected) eq_(content, expected)
def test_utf16_encoding(): def test_utf16_encoding():
content = [[u'Äkkilähdöt', u'Matkakirjoituksia', u'Matkatoimistot']] content = [[u"Äkkilähdöt", u"Matkakirjoituksia", u"Matkatoimistot"]]
test_file = "test-utf16-encoding.csv" test_file = "test-utf16-encoding.csv"
writer = CSVFileWriter( writer = CSVFileWriter(
test_file, None, test_file, None, encoding="utf-16", lineterminator="\n"
encoding="utf-16", lineterminator="\n") )
writer.write_array(content) writer.write_array(content)
writer.close() writer.close()
with open(test_file, "rb") as f: with open(test_file, "rb") as f:
actual = f.read().decode('utf-16') actual = f.read().decode("utf-16")
if PY2: if PY2:
actual = actual.encode('utf-8') actual = actual.encode("utf-8")
eq_(actual, 'Äkkilähdöt,Matkakirjoituksia,Matkatoimistot\n') eq_(actual, "Äkkilähdöt,Matkakirjoituksia,Matkatoimistot\n")
os.unlink(test_file) os.unlink(test_file)
def test_utf16_memory_decoding(): def test_utf16_memory_decoding():
test_content = u'Äkkilähdöt,Matkakirjoituksia,Matkatoimistot' test_content = u"Äkkilähdöt,Matkakirjoituksia,Matkatoimistot"
test_content = BytesIO(test_content.encode('utf-16')) test_content = BytesIO(test_content.encode("utf-16"))
reader = CSVinMemoryReader( reader = CSVinMemoryReader(
NamedContent('csv', test_content), NamedContent("csv", test_content), encoding="utf-16"
encoding="utf-16") )
content = list(reader.to_array()) content = list(reader.to_array())
if PY2: if PY2:
content[0] = [s.encode('utf-8') for s in content[0]] content[0] = [s.encode("utf-8") for s in content[0]]
expected = [['Äkkilähdöt', 'Matkakirjoituksia', 'Matkatoimistot']] expected = [["Äkkilähdöt", "Matkakirjoituksia", "Matkatoimistot"]]
eq_(content, expected) eq_(content, expected)
def test_utf16_memory_encoding(): def test_utf16_memory_encoding():
content = [[u'Äkkilähdöt', u'Matkakirjoituksia', u'Matkatoimistot']] content = [[u"Äkkilähdöt", u"Matkakirjoituksia", u"Matkatoimistot"]]
io = StringIO() io = StringIO()
writer = CSVMemoryWriter( writer = CSVMemoryWriter(
io, None, lineterminator="\n", single_sheet_in_book=True, io,
encoding="utf-16") None,
lineterminator="\n",
single_sheet_in_book=True,
encoding="utf-16",
)
writer.write_array(content) writer.write_array(content)
actual = io.getvalue() actual = io.getvalue()
if PY2: if PY2:
actual = actual.decode('utf-16') actual = actual.decode("utf-16")
eq_(actual, u'Äkkilähdöt,Matkakirjoituksia,Matkatoimistot\n') eq_(actual, u"Äkkilähdöt,Matkakirjoituksia,Matkatoimistot\n")

View File

@ -6,15 +6,15 @@ from pyexcel_io.database.common import (
DjangoModelImporter, DjangoModelImporter,
DjangoModelImportAdapter, DjangoModelImportAdapter,
DjangoModelExporter, DjangoModelExporter,
DjangoModelExportAdapter DjangoModelExportAdapter,
) )
from pyexcel_io.database.importers.django import ( from pyexcel_io.database.importers.django import (
DjangoModelWriter, DjangoModelWriter,
DjangoBookWriter DjangoBookWriter,
) )
from pyexcel_io.database.exporters.django import ( from pyexcel_io.database.exporters.django import (
DjangoModelReader, DjangoModelReader,
DjangoBookReader DjangoBookReader,
) )
@ -92,21 +92,13 @@ class FakeExceptionDjangoModel(FakeDjangoModel):
self.raiseException = raiseException self.raiseException = raiseException
def __call__(self, **keywords): def __call__(self, **keywords):
return Package(raiseException=self.raiseException, return Package(raiseException=self.raiseException, **keywords)
**keywords)
class TestException: class TestException:
def setUp(self): def setUp(self):
self.data = [ self.data = [["X", "Y", "Z"], [1, 2, 3], [4, 5, 6]]
["X", "Y", "Z"], self.result = [{"Y": 2, "X": 1, "Z": 3}, {"Y": 5, "X": 4, "Z": 6}]
[1, 2, 3],
[4, 5, 6]
]
self.result = [
{'Y': 2, 'X': 1, 'Z': 3},
{'Y': 5, 'X': 4, 'Z': 6}
]
@raises(Exception) @raises(Exception)
def test_bulk_save_to_django_model_with_exception(self): def test_bulk_save_to_django_model_with_exception(self):
@ -129,15 +121,8 @@ class TestException:
class TestSheet: class TestSheet:
def setUp(self): def setUp(self):
self.data = [ self.data = [["X", "Y", "Z"], [1, 2, 3], [4, 5, 6]]
["X", "Y", "Z"], self.result = [{"Y": 2, "X": 1, "Z": 3}, {"Y": 5, "X": 4, "Z": 6}]
[1, 2, 3],
[4, 5, 6]
]
self.result = [
{'Y': 2, 'X': 1, 'Z': 3},
{'Y': 5, 'X': 4, 'Z': 6}
]
def test_sheet_save_to_django_model(self): def test_sheet_save_to_django_model(self):
model = FakeDjangoModel() model = FakeDjangoModel()
@ -150,12 +135,7 @@ class TestSheet:
def test_sheet_save_to_django_model_with_empty_array(self): def test_sheet_save_to_django_model_with_empty_array(self):
model = FakeDjangoModel() model = FakeDjangoModel()
data = [ data = [["X", "Y", "Z"], ["", "", ""], [1, 2, 3], [4, 5, 6]]
["X", "Y", "Z"],
['', '', ''],
[1, 2, 3],
[4, 5, 6]
]
adapter = DjangoModelImportAdapter(model) adapter = DjangoModelImportAdapter(model)
adapter.column_names = self.data[0] adapter.column_names = self.data[0]
writer = DjangoModelWriter(None, adapter) writer = DjangoModelWriter(None, adapter)
@ -169,6 +149,7 @@ class TestSheet:
def wrapper(row): def wrapper(row):
row[0] = row[0] + 1 row[0] = row[0] + 1
return row return row
adapter = DjangoModelImportAdapter(model) adapter = DjangoModelImportAdapter(model)
adapter.column_names = self.data[0] adapter.column_names = self.data[0]
adapter.row_initializer = wrapper adapter.row_initializer = wrapper
@ -176,8 +157,8 @@ class TestSheet:
writer.write_array(self.data[1:]) writer.write_array(self.data[1:])
writer.close() writer.close()
assert model.objects.objs == [ assert model.objects.objs == [
{'Y': 2, 'X': 2, 'Z': 3}, {"Y": 2, "X": 2, "Z": 3},
{'Y': 5, 'X': 5, 'Z': 6} {"Y": 5, "X": 5, "Z": 6},
] ]
def test_sheet_save_to_django_model_skip_me(self): def test_sheet_save_to_django_model_skip_me(self):
@ -188,15 +169,14 @@ class TestSheet:
return None return None
else: else:
return row return row
adapter = DjangoModelImportAdapter(model) adapter = DjangoModelImportAdapter(model)
adapter.column_names = self.data[0] adapter.column_names = self.data[0]
adapter.row_initializer = wrapper adapter.row_initializer = wrapper
writer = DjangoModelWriter(None, adapter) writer = DjangoModelWriter(None, adapter)
writer.write_array(self.data[1:]) writer.write_array(self.data[1:])
writer.close() writer.close()
assert model.objects.objs == [ assert model.objects.objs == [{"Y": 2, "X": 1, "Z": 3}]
{'Y': 2, 'X': 1, 'Z': 3}
]
def test_load_sheet_from_django_model(self): def test_load_sheet_from_django_model(self):
model = FakeDjangoModel() model = FakeDjangoModel()
@ -204,8 +184,9 @@ class TestSheet:
adapter = DjangoModelImportAdapter(model) adapter = DjangoModelImportAdapter(model)
adapter.column_names = self.data[0] adapter.column_names = self.data[0]
importer.append(adapter) importer.append(adapter)
save_data(importer, {adapter.get_name(): self.data[1:]}, save_data(
file_type=DB_DJANGO) importer, {adapter.get_name(): self.data[1:]}, file_type=DB_DJANGO
)
assert model.objects.objs == self.result assert model.objects.objs == self.result
model._meta.update(["X", "Y", "Z"]) model._meta.update(["X", "Y", "Z"])
reader = DjangoModelReader(model) reader = DjangoModelReader(model)
@ -218,30 +199,23 @@ class TestSheet:
adapter = DjangoModelImportAdapter(model) adapter = DjangoModelImportAdapter(model)
adapter.column_names = self.data[0] adapter.column_names = self.data[0]
importer.append(adapter) importer.append(adapter)
save_data(importer, {adapter.get_name(): self.data[1:]}, save_data(
file_type=DB_DJANGO) importer, {adapter.get_name(): self.data[1:]}, file_type=DB_DJANGO
)
assert model.objects.objs == self.result assert model.objects.objs == self.result
model._meta.update(["X", "Y", "Z"]) model._meta.update(["X", "Y", "Z"])
def row_renderer(row): def row_renderer(row):
return [str(element) for element in row] return [str(element) for element in row]
# the key point of this test case # the key point of this test case
reader = DjangoModelReader(model, reader = DjangoModelReader(model, row_renderer=row_renderer)
row_renderer=row_renderer)
data = reader.to_array() data = reader.to_array()
expected = [ expected = [["X", "Y", "Z"], ["1", "2", "3"], ["4", "5", "6"]]
["X", "Y", "Z"],
['1', '2', '3'],
['4', '5', '6']
]
eq_(list(data), expected) eq_(list(data), expected)
def test_mapping_array(self): def test_mapping_array(self):
data2 = [ data2 = [["A", "B", "C"], [1, 2, 3], [4, 5, 6]]
["A", "B", "C"],
[1, 2, 3],
[4, 5, 6]
]
model = FakeDjangoModel() model = FakeDjangoModel()
adapter = DjangoModelImportAdapter(model) adapter = DjangoModelImportAdapter(model)
adapter.column_names = data2[0] adapter.column_names = data2[0]
@ -252,16 +226,8 @@ class TestSheet:
eq_(model.objects.objs, self.result) eq_(model.objects.objs, self.result)
def test_mapping_dict(self): def test_mapping_dict(self):
data2 = [ data2 = [["A", "B", "C"], [1, 2, 3], [4, 5, 6]]
["A", "B", "C"], mapdict = {"C": "Z", "A": "X", "B": "Y"}
[1, 2, 3],
[4, 5, 6]
]
mapdict = {
"C": "Z",
"A": "X",
"B": "Y"
}
model = FakeDjangoModel() model = FakeDjangoModel()
adapter = DjangoModelImportAdapter(model) adapter = DjangoModelImportAdapter(model)
adapter.column_names = data2[0] adapter.column_names = data2[0]
@ -281,16 +247,22 @@ class TestSheet:
class TestMultipleModels: class TestMultipleModels:
def setUp(self): def setUp(self):
self.content = OrderedDict() self.content = OrderedDict()
self.content.update({ self.content.update(
"Sheet1": [[u'X', u'Y', u'Z'], [1, 4, 7], [2, 5, 8], [3, 6, 9]]}) {"Sheet1": [[u"X", u"Y", u"Z"], [1, 4, 7], [2, 5, 8], [3, 6, 9]]}
self.content.update({ )
"Sheet2": [[u'A', u'B', u'C'], [1, 4, 7], [2, 5, 8], [3, 6, 9]]}) self.content.update(
self.result1 = [{'Y': 4, 'X': 1, 'Z': 7}, {"Sheet2": [[u"A", u"B", u"C"], [1, 4, 7], [2, 5, 8], [3, 6, 9]]}
{'Y': 5, 'X': 2, 'Z': 8}, )
{'Y': 6, 'X': 3, 'Z': 9}] self.result1 = [
self.result2 = [{'B': 4, 'A': 1, 'C': 7}, {"Y": 4, "X": 1, "Z": 7},
{'B': 5, 'A': 2, 'C': 8}, {"Y": 5, "X": 2, "Z": 8},
{'B': 6, 'A': 3, 'C': 9}] {"Y": 6, "X": 3, "Z": 9},
]
self.result2 = [
{"B": 4, "A": 1, "C": 7},
{"B": 5, "A": 2, "C": 8},
{"B": 6, "A": 3, "C": 9},
]
def test_save_to_more_models(self): def test_save_to_more_models(self):
sample_size = 10 sample_size = 10
@ -298,14 +270,14 @@ class TestMultipleModels:
model2 = FakeDjangoModel() model2 = FakeDjangoModel()
importer = DjangoModelImporter() importer = DjangoModelImporter()
adapter1 = DjangoModelImportAdapter(model1) adapter1 = DjangoModelImportAdapter(model1)
adapter1.column_names = self.content['Sheet1'][0] adapter1.column_names = self.content["Sheet1"][0]
adapter2 = DjangoModelImportAdapter(model2) adapter2 = DjangoModelImportAdapter(model2)
adapter2.column_names = self.content['Sheet2'][0] adapter2.column_names = self.content["Sheet2"][0]
importer.append(adapter1) importer.append(adapter1)
importer.append(adapter2) importer.append(adapter2)
to_store = { to_store = {
adapter1.get_name(): self.content['Sheet1'][1:], adapter1.get_name(): self.content["Sheet1"][1:],
adapter2.get_name(): self.content['Sheet2'][1:] adapter2.get_name(): self.content["Sheet2"][1:],
} }
writer = DjangoBookWriter() writer = DjangoBookWriter()
writer.open_content(importer, batch_size=sample_size) writer.open_content(importer, batch_size=sample_size)
@ -321,14 +293,14 @@ class TestMultipleModels:
model2 = FakeDjangoModel() model2 = FakeDjangoModel()
importer = DjangoModelImporter() importer = DjangoModelImporter()
adapter1 = DjangoModelImportAdapter(model1) adapter1 = DjangoModelImportAdapter(model1)
adapter1.column_names = self.content['Sheet1'][0] adapter1.column_names = self.content["Sheet1"][0]
adapter2 = DjangoModelImportAdapter(model2) adapter2 = DjangoModelImportAdapter(model2)
adapter2.column_names = self.content['Sheet2'][0] adapter2.column_names = self.content["Sheet2"][0]
importer.append(adapter1) importer.append(adapter1)
importer.append(adapter2) importer.append(adapter2)
to_store = { to_store = {
adapter1.get_name(): self.content['Sheet1'][1:], adapter1.get_name(): self.content["Sheet1"][1:],
adapter2.get_name(): self.content['Sheet2'][1:] adapter2.get_name(): self.content["Sheet2"][1:],
} }
writer = DjangoBookWriter() writer = DjangoBookWriter()
writer.open_content(importer, batch_size=sample_size, bulk_save=False) writer.open_content(importer, batch_size=sample_size, bulk_save=False)
@ -341,14 +313,14 @@ class TestMultipleModels:
model2 = FakeDjangoModel() model2 = FakeDjangoModel()
importer = DjangoModelImporter() importer = DjangoModelImporter()
adapter1 = DjangoModelImportAdapter(model1) adapter1 = DjangoModelImportAdapter(model1)
adapter1.column_names = self.content['Sheet1'][0] adapter1.column_names = self.content["Sheet1"][0]
adapter2 = DjangoModelImportAdapter(model2) adapter2 = DjangoModelImportAdapter(model2)
adapter2.column_names = self.content['Sheet2'][0] adapter2.column_names = self.content["Sheet2"][0]
importer.append(adapter1) importer.append(adapter1)
importer.append(adapter2) importer.append(adapter2)
to_store = { to_store = {
adapter1.get_name(): self.content['Sheet1'][1:], adapter1.get_name(): self.content["Sheet1"][1:],
adapter2.get_name(): self.content['Sheet2'][1:] adapter2.get_name(): self.content["Sheet2"][1:],
} }
save_data(importer, to_store, file_type=DB_DJANGO) save_data(importer, to_store, file_type=DB_DJANGO)
assert model1.objects.objs == self.result1 assert model1.objects.objs == self.result1
@ -374,11 +346,11 @@ class TestMultipleModels:
model1 = FakeDjangoModel() model1 = FakeDjangoModel()
importer = DjangoModelImporter() importer = DjangoModelImporter()
adapter = DjangoModelImportAdapter(model1) adapter = DjangoModelImportAdapter(model1)
adapter.column_names = self.content['Sheet1'][0] adapter.column_names = self.content["Sheet1"][0]
importer.append(adapter) importer.append(adapter)
to_store = { to_store = {
adapter.get_name(): self.content['Sheet1'][1:], adapter.get_name(): self.content["Sheet1"][1:],
"Sheet2": self.content['Sheet2'][1:] "Sheet2": self.content["Sheet2"][1:],
} }
save_data(importer, to_store, file_type=DB_DJANGO) save_data(importer, to_store, file_type=DB_DJANGO)
assert model1.objects.objs == self.result1 assert model1.objects.objs == self.result1
@ -390,7 +362,7 @@ class TestMultipleModels:
reader = DjangoBookReader() reader = DjangoBookReader()
reader.open_content(exporter) reader.open_content(exporter)
data = reader.read_all() data = reader.read_all()
assert list(data['Sheet1']) == self.content['Sheet1'] assert list(data["Sheet1"]) == self.content["Sheet1"]
@raises(TypeError) @raises(TypeError)
@ -407,29 +379,23 @@ def test_not_implemented_method_2():
class TestFilter: class TestFilter:
def setUp(self): def setUp(self):
self.data = [ self.data = [["X", "Y", "Z"], [1, 2, 3], [4, 5, 6]]
["X", "Y", "Z"], self.result = [{"Y": 2, "X": 1, "Z": 3}, {"Y": 5, "X": 4, "Z": 6}]
[1, 2, 3],
[4, 5, 6]
]
self.result = [
{'Y': 2, 'X': 1, 'Z': 3},
{'Y': 5, 'X': 4, 'Z': 6}
]
self.model = FakeDjangoModel() self.model = FakeDjangoModel()
importer = DjangoModelImporter() importer = DjangoModelImporter()
adapter = DjangoModelImportAdapter(self.model) adapter = DjangoModelImportAdapter(self.model)
adapter.column_names = self.data[0] adapter.column_names = self.data[0]
importer.append(adapter) importer.append(adapter)
save_data(importer, {adapter.get_name(): self.data[1:]}, save_data(
file_type=DB_DJANGO) importer, {adapter.get_name(): self.data[1:]}, file_type=DB_DJANGO
)
assert self.model.objects.objs == self.result assert self.model.objects.objs == self.result
self.model._meta.update(["X", "Y", "Z"]) self.model._meta.update(["X", "Y", "Z"])
def test_load_sheet_from_django_model_with_filter(self): def test_load_sheet_from_django_model_with_filter(self):
reader = DjangoModelReader(self.model, start_row=0, row_limit=2) reader = DjangoModelReader(self.model, start_row=0, row_limit=2)
data = reader.to_array() data = reader.to_array()
expected = [['X', 'Y', 'Z'], [1, 2, 3]] expected = [["X", "Y", "Z"], [1, 2, 3]]
eq_(list(data), expected) eq_(list(data), expected)
def test_load_sheet_from_django_model_with_filter_1(self): def test_load_sheet_from_django_model_with_filter_1(self):
@ -441,18 +407,18 @@ class TestFilter:
def test_load_sheet_from_django_model_with_filter_2(self): def test_load_sheet_from_django_model_with_filter_2(self):
reader = DjangoModelReader(self.model, start_column=1) reader = DjangoModelReader(self.model, start_column=1)
data = reader.to_array() data = reader.to_array()
expected = [['Y', 'Z'], [2, 3], [5, 6]] expected = [["Y", "Z"], [2, 3], [5, 6]]
eq_(list(data), expected) eq_(list(data), expected)
def test_load_sheet_from_django_model_with_filter_3(self): def test_load_sheet_from_django_model_with_filter_3(self):
reader = DjangoModelReader(self.model, start_column=1, column_limit=1) reader = DjangoModelReader(self.model, start_column=1, column_limit=1)
data = reader.to_array() data = reader.to_array()
expected = [['Y'], [2], [5]] expected = [["Y"], [2], [5]]
eq_(list(data), expected) eq_(list(data), expected)
def test_django_model_import_adapter(): def test_django_model_import_adapter():
adapter = DjangoModelImportAdapter(FakeDjangoModel) adapter = DjangoModelImportAdapter(FakeDjangoModel)
adapter.column_names = ['a'] adapter.column_names = ["a"]
adapter.row_initializer = "abc" adapter.row_initializer = "abc"
eq_(adapter.row_initializer, "abc") eq_(adapter.row_initializer, "abc")

View File

@ -7,17 +7,18 @@ import pyexcel_io.constants as constants
def test_index_filter(): def test_index_filter():
current_index, start, limit, expected = (0, 1, -1, current_index, start, limit, expected = (0, 1, -1, constants.SKIP_DATA)
constants.SKIP_DATA)
eq_(_index_filter(current_index, start, limit), expected) eq_(_index_filter(current_index, start, limit), expected)
current_index, start, limit, expected = (2, 1, -1, current_index, start, limit, expected = (2, 1, -1, constants.TAKE_DATA)
constants.TAKE_DATA)
eq_(_index_filter(current_index, start, limit), expected) eq_(_index_filter(current_index, start, limit), expected)
current_index, start, limit, expected = (2, 1, 10, current_index, start, limit, expected = (2, 1, 10, constants.TAKE_DATA)
constants.TAKE_DATA)
eq_(_index_filter(current_index, start, limit), expected) eq_(_index_filter(current_index, start, limit), expected)
current_index, start, limit, expected = (100, 1, 10, current_index, start, limit, expected = (
constants.STOP_ITERATION) 100,
1,
10,
constants.STOP_ITERATION,
)
eq_(_index_filter(current_index, start, limit), expected) eq_(_index_filter(current_index, start, limit), expected)
@ -30,7 +31,7 @@ class TestFilter:
[3, 23, 33], [3, 23, 33],
[4, 24, 34], [4, 24, 34],
[5, 25, 35], [5, 25, 35],
[6, 26, 36] [6, 26, 36],
] ]
save_data(self.test_file, sample) save_data(self.test_file, sample)
@ -46,26 +47,29 @@ class TestFilter:
def test_filter_column(self): def test_filter_column(self):
filtered_data = get_data(self.test_file, start_column=1) filtered_data = get_data(self.test_file, start_column=1)
expected = [[21, 31], [22, 32], [23, 33], expected = [[21, 31], [22, 32], [23, 33], [24, 34], [25, 35], [26, 36]]
[24, 34], [25, 35], [26, 36]]
eq_(filtered_data[self.test_file], expected) eq_(filtered_data[self.test_file], expected)
def test_filter_column_2(self): def test_filter_column_2(self):
filtered_data = get_data(self.test_file, filtered_data = get_data(
start_column=1, column_limit=1) self.test_file, start_column=1, column_limit=1
)
expected = [[21], [22], [23], [24], [25], [26]] expected = [[21], [22], [23], [24], [25], [26]]
eq_(filtered_data[self.test_file], expected) eq_(filtered_data[self.test_file], expected)
def test_filter_both_ways(self): def test_filter_both_ways(self):
filtered_data = get_data(self.test_file, filtered_data = get_data(self.test_file, start_column=1, start_row=3)
start_column=1, start_row=3)
expected = [[24, 34], [25, 35], [26, 36]] expected = [[24, 34], [25, 35], [26, 36]]
eq_(filtered_data[self.test_file], expected) eq_(filtered_data[self.test_file], expected)
def test_filter_both_ways_2(self): def test_filter_both_ways_2(self):
filtered_data = get_data(self.test_file, filtered_data = get_data(
start_column=1, column_limit=1, self.test_file,
start_row=3, row_limit=1) start_column=1,
column_limit=1,
start_row=3,
row_limit=1,
)
expected = [[24]] expected = [[24]]
eq_(filtered_data[self.test_file], expected) eq_(filtered_data[self.test_file], expected)

View File

@ -15,6 +15,15 @@ from zipfile import BadZipfile
PY2 = sys.version_info[0] == 2 PY2 = sys.version_info[0] == 2
def test_force_file_type():
test_file = "force_file_type.txt"
data = get_data(
os.path.join("tests", "fixtures", test_file), force_file_type="csv"
)
expected = [[1, 2, 3]]
eq_(expected, data[test_file])
@raises(IOError) @raises(IOError)
def test_no_valid_parameters(): def test_no_valid_parameters():
load_data() load_data()
@ -65,7 +74,7 @@ def test_load_ods_data_from_memory():
def test_write_xlsx_data_to_memory(): def test_write_xlsx_data_to_memory():
data = {'Sheet': [[1]]} data = {"Sheet": [[1]]}
io = BytesIO() io = BytesIO()
msg = "Please install one of these plugins for write data in 'xlsx': " msg = "Please install one of these plugins for write data in 'xlsx': "
msg += "pyexcel-xlsx,pyexcel-xlsxw" msg += "pyexcel-xlsx,pyexcel-xlsxw"
@ -110,7 +119,7 @@ def test_writer_csvz_data_from_memory():
if not PY2: if not PY2:
io = StringIO() io = StringIO()
writer = get_writer(io, file_type="csvz") writer = get_writer(io, file_type="csvz")
writer.write({'adb': [[2, 3]]}) writer.write({"adb": [[2, 3]]})
else: else:
raise Exception("pass it") raise Exception("pass it")
@ -125,7 +134,7 @@ def test_writer_xlsm_data_from_memory2():
def test_writer_unknown_data_from_memory2(): def test_writer_unknown_data_from_memory2():
io = BytesIO() io = BytesIO()
# mock it # mock it
manager.register_stream_type('unknown1', 'text') manager.register_stream_type("unknown1", "text")
get_writer(io, file_type="unknown1") get_writer(io, file_type="unknown1")
@ -138,115 +147,115 @@ def test_get_io_type():
t = manager.get_io_type("hello") t = manager.get_io_type("hello")
assert t is None assert t is None
t = manager.get_io_type("csv") t = manager.get_io_type("csv")
eq_(t, 'string') eq_(t, "string")
t = manager.get_io_type("xls") t = manager.get_io_type("xls")
eq_(t, 'bytes') eq_(t, "bytes")
def test_default_csv_format(): def test_default_csv_format():
data = [['1', '2', '3']] data = [["1", "2", "3"]]
io = manager.get_io("csv") io = manager.get_io("csv")
# test default format for saving is 'csv' # test default format for saving is 'csv'
save_data(io, data) save_data(io, data)
io.seek(0) io.seek(0)
# test default format for reading is 'csv' # test default format for reading is 'csv'
result = get_data(io) result = get_data(io)
assert result['csv'] == [[1, 2, 3]] assert result["csv"] == [[1, 2, 3]]
def test_case_insentivity(): def test_case_insentivity():
data = [['1', '2', '3']] data = [["1", "2", "3"]]
io = manager.get_io("CSV") io = manager.get_io("CSV")
# test default format for saving is 'csv' # test default format for saving is 'csv'
save_data(io, data) save_data(io, data)
io.seek(0) io.seek(0)
# test default format for reading is 'csv' # test default format for reading is 'csv'
result = get_data(io) result = get_data(io)
assert result['csv'] == [[1, 2, 3]] assert result["csv"] == [[1, 2, 3]]
def test_file_handle_as_input(): def test_file_handle_as_input():
test_file = "file_handle.csv" test_file = "file_handle.csv"
with open(test_file, 'w') as f: with open(test_file, "w") as f:
f.write("1,2,3") f.write("1,2,3")
with open(test_file, 'r') as f: with open(test_file, "r") as f:
data = get_data(f, 'csv') data = get_data(f, "csv")
eq_(data['csv'], [[1, 2, 3]]) eq_(data["csv"], [[1, 2, 3]])
def test_file_type_case_insensitivity(): def test_file_type_case_insensitivity():
test_file = "file_handle.CSv" test_file = "file_handle.CSv"
with open(test_file, 'w') as f: with open(test_file, "w") as f:
f.write("1,2,3") f.write("1,2,3")
with open(test_file, 'r') as f: with open(test_file, "r") as f:
data = get_data(f, 'csv') data = get_data(f, "csv")
eq_(data['csv'], [[1, 2, 3]]) eq_(data["csv"], [[1, 2, 3]])
def test_file_handle_as_output(): def test_file_handle_as_output():
test_file = "file_handle.csv" test_file = "file_handle.csv"
with open(test_file, 'w') as f: with open(test_file, "w") as f:
save_data(f, [[1, 2, 3]], 'csv', lineterminator='\n') save_data(f, [[1, 2, 3]], "csv", lineterminator="\n")
with open(test_file, 'r') as f: with open(test_file, "r") as f:
content = f.read() content = f.read()
eq_(content, '1,2,3\n') eq_(content, "1,2,3\n")
def test_binary_file_content(): def test_binary_file_content():
data = [['1', '2', '3']] data = [["1", "2", "3"]]
io = manager.get_io("csvz") io = manager.get_io("csvz")
save_data(io, data, 'csvz') save_data(io, data, "csvz")
result = get_data(io.getvalue(), 'csvz') result = get_data(io.getvalue(), "csvz")
eq_(result['pyexcel_sheet1'], [[1, 2, 3]]) eq_(result["pyexcel_sheet1"], [[1, 2, 3]])
def test_text_file_content(): def test_text_file_content():
data = [['1', '2', '3']] data = [["1", "2", "3"]]
io = manager.get_io("csv") io = manager.get_io("csv")
save_data(io, data, 'csv') save_data(io, data, "csv")
result = get_data(io.getvalue(), 'csv') result = get_data(io.getvalue(), "csv")
eq_(result['csv'], [[1, 2, 3]]) eq_(result["csv"], [[1, 2, 3]])
def test_library_parameter(): def test_library_parameter():
data = [['1', '2', '3']] data = [["1", "2", "3"]]
io = manager.get_io("csv") io = manager.get_io("csv")
save_data(io, data, 'csv', library="pyexcel-io") save_data(io, data, "csv", library="pyexcel-io")
result = get_data(io.getvalue(), 'csv', library="pyexcel-io") result = get_data(io.getvalue(), "csv", library="pyexcel-io")
eq_(result['csv'], [[1, 2, 3]]) eq_(result["csv"], [[1, 2, 3]])
@raises(Exception) @raises(Exception)
def test_library_parameter_error_situation(): def test_library_parameter_error_situation():
data = [['1', '2', '3']] data = [["1", "2", "3"]]
io = manager.get_io("csv") io = manager.get_io("csv")
save_data(io, data, 'csv', library="doesnot-exist") save_data(io, data, "csv", library="doesnot-exist")
def test_conversion_from_bytes_to_text(): def test_conversion_from_bytes_to_text():
test_file = "conversion.csv" test_file = "conversion.csv"
data = [['1', '2', '3']] data = [["1", "2", "3"]]
save_data(test_file, data) save_data(test_file, data)
with open(test_file, "rb") as f: with open(test_file, "rb") as f:
content = f.read() content = f.read()
result = get_data(content, 'csv') result = get_data(content, "csv")
assert result['csv'] == [[1, 2, 3]] assert result["csv"] == [[1, 2, 3]]
os.unlink(test_file) os.unlink(test_file)
def test_is_string(): def test_is_string():
if PY2: if PY2:
assert is_string(type(u'a')) is True assert is_string(type(u"a")) is True
else: else:
assert is_string(type('a')) is True assert is_string(type("a")) is True
def test_generator_is_obtained(): def test_generator_is_obtained():
data, reader = iget_data(os.path.join("tests", "fixtures", "test.csv")) data, reader = iget_data(os.path.join("tests", "fixtures", "test.csv"))
assert isinstance(data['test.csv'], types.GeneratorType) assert isinstance(data["test.csv"], types.GeneratorType)
reader.close() reader.close()
@ -258,27 +267,19 @@ def test_generator_can_be_written():
assert os.path.exists(test_filename) assert os.path.exists(test_filename)
data2 = get_data(test_filename) data2 = get_data(test_filename)
expected = get_data(test_fixture) expected = get_data(test_fixture)
assert data2[test_filename] == expected['test.csv'] assert data2[test_filename] == expected["test.csv"]
os.unlink(test_filename) os.unlink(test_filename)
class TestReadMultipleSheets(TestCase): class TestReadMultipleSheets(TestCase):
file_type = "csv" file_type = "csv"
delimiter = ',' delimiter = ","
def setUp(self): def setUp(self):
self.test_file_formatter = "csv_multiple__%s__%s." + self.file_type self.test_file_formatter = "csv_multiple__%s__%s." + self.file_type
self.merged_book_file = "csv_multiple." + self.file_type self.merged_book_file = "csv_multiple." + self.file_type
self.data = [ self.data = [["1", "2", "3"], ["4", "5", "6"], ["7", "8", "9"]]
["1", "2", "3"], self.expected_data = [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
["4", "5", "6"],
["7", "8", "9"]
]
self.expected_data = [
[1, 2, 3],
[4, 5, 6],
[7, 8, 9]
]
self.sheets = OrderedDict() self.sheets = OrderedDict()
self.sheets.update({"sheet1": self.data}) self.sheets.update({"sheet1": self.data})
self.sheets.update({"sheet2": self.data}) self.sheets.update({"sheet2": self.data})
@ -290,24 +291,24 @@ class TestReadMultipleSheets(TestCase):
index = 0 index = 0
for key, value in self.sheets.items(): for key, value in self.sheets.items():
file_name = self.test_file_formatter % (key, index) file_name = self.test_file_formatter % (key, index)
with open(file_name, 'w') as f: with open(file_name, "w") as f:
for row in value: for row in value:
f.write(self.delimiter.join(row) + "\n") f.write(self.delimiter.join(row) + "\n")
index = index + 1 index = index + 1
def test_sheet_name(self): def test_sheet_name(self):
sheets = get_data(self.merged_book_file, sheet_name="sheet1") sheets = get_data(self.merged_book_file, sheet_name="sheet1")
eq_(sheets['sheet1'], self.expected_sheets['sheet1']) eq_(sheets["sheet1"], self.expected_sheets["sheet1"])
def test_sheet_index(self): def test_sheet_index(self):
sheets = get_data(self.merged_book_file, sheet_index=1) sheets = get_data(self.merged_book_file, sheet_index=1)
eq_(sheets['sheet2'], self.expected_sheets['sheet2']) eq_(sheets["sheet2"], self.expected_sheets["sheet2"])
def test_read_many(self): def test_read_many(self):
sheets = get_data(self.merged_book_file, sheets=['sheet1', 2]) sheets = get_data(self.merged_book_file, sheets=["sheet1", 2])
eq_(sheets['sheet1'], self.expected_sheets['sheet1']) eq_(sheets["sheet1"], self.expected_sheets["sheet1"])
eq_(sheets['sheet3'], self.expected_sheets['sheet3']) eq_(sheets["sheet3"], self.expected_sheets["sheet3"])
assert 'sheet2' not in sheets assert "sheet2" not in sheets
def tearDown(self): def tearDown(self):
index = 0 index = 0

View File

@ -8,18 +8,12 @@ from pyexcel_io import get_data, save_data
from pyexcel_io._compact import PY26 from pyexcel_io._compact import PY26
import pyexcel as p import pyexcel as p
IN_TRAVIS = 'TRAVIS' in os.environ IN_TRAVIS = "TRAVIS" in os.environ
def test_issue_8(): def test_issue_8():
test_file = "test_issue_8.csv" test_file = "test_issue_8.csv"
data = [ data = [[1, 2], [], [], [], [3, 4]]
[1, 2],
[],
[],
[],
[3, 4]
]
save_data(test_file, data) save_data(test_file, data)
written_data = get_data(test_file, skip_empty_rows=False) written_data = get_data(test_file, skip_empty_rows=False)
eq_(data, written_data[test_file]) eq_(data, written_data[test_file])
@ -29,21 +23,21 @@ def test_issue_8():
def test_issue_20(): def test_issue_20():
test_file = get_fixture("issue20.csv") test_file = get_fixture("issue20.csv")
data = get_data(test_file) data = get_data(test_file)
expected = [[u'to', u'infinity', u'and', u'beyond']] expected = [[u"to", u"infinity", u"and", u"beyond"]]
eq_(data['issue20.csv'], expected) eq_(data["issue20.csv"], expected)
def test_issue_23(): def test_issue_23():
test_file = get_fixture("issue23.csv") test_file = get_fixture("issue23.csv")
data = get_data(test_file) data = get_data(test_file)
expected = [ expected = [
[8204235414504252, u'inf'], [8204235414504252, u"inf"],
[82042354145042521, u'-inf'], [82042354145042521, u"-inf"],
[820423541450425216, 0], [820423541450425216, 0],
[820423541450425247, 1], [820423541450425247, 1],
[8204235414504252490, 1.1] [8204235414504252490, 1.1],
] ]
eq_(data['issue23.csv'], expected) eq_(data["issue23.csv"], expected)
# def test_issue_28(): # def test_issue_28():
@ -62,24 +56,26 @@ def test_issue_33_34():
pass pass
else: else:
import mmap import mmap
test_file = get_fixture("issue20.csv") test_file = get_fixture("issue20.csv")
with open(test_file, 'r+b') as f: with open(test_file, "r+b") as f:
memory_mapped_file = mmap.mmap( memory_mapped_file = mmap.mmap(
f.fileno(), 0, access=mmap.ACCESS_READ) f.fileno(), 0, access=mmap.ACCESS_READ
data = get_data(memory_mapped_file, file_type='csv') )
expected = [[u'to', u'infinity', u'and', u'beyond']] data = get_data(memory_mapped_file, file_type="csv")
eq_(data['csv'], expected) expected = [[u"to", u"infinity", u"and", u"beyond"]]
eq_(data["csv"], expected)
def test_issue_30_utf8_BOM_header(): def test_issue_30_utf8_BOM_header():
content = [[u'人有悲歡離合', u'月有陰晴圓缺']] content = [[u"人有悲歡離合", u"月有陰晴圓缺"]]
test_file = "test-utf8-BOM.csv" test_file = "test-utf8-BOM.csv"
save_data(test_file, content, encoding="utf-8-sig", lineterminator="\n") save_data(test_file, content, encoding="utf-8-sig", lineterminator="\n")
custom_encoded_content = get_data(test_file, encoding="utf-8-sig") custom_encoded_content = get_data(test_file, encoding="utf-8-sig")
assert custom_encoded_content[test_file] == content assert custom_encoded_content[test_file] == content
with open(test_file, "rb") as f: with open(test_file, "rb") as f:
content = f.read() content = f.read()
assert content[0:3] == b'\xef\xbb\xbf' assert content[0:3] == b"\xef\xbb\xbf"
os.unlink(test_file) os.unlink(test_file)
@ -87,96 +83,100 @@ def test_issue_33_34_utf32_encoded_file():
if PY26: if PY26:
pass pass
else: else:
check_mmap_encoding('utf-32') check_mmap_encoding("utf-32")
def test_issue_33_34_utf32be_encoded_file(): def test_issue_33_34_utf32be_encoded_file():
if PY26: if PY26:
pass pass
else: else:
check_mmap_encoding('utf-32-be') check_mmap_encoding("utf-32-be")
def test_issue_33_34_utf32le_encoded_file(): def test_issue_33_34_utf32le_encoded_file():
if PY26: if PY26:
pass pass
else: else:
check_mmap_encoding('utf-32-le') check_mmap_encoding("utf-32-le")
def test_issue_33_34_utf16_encoded_file(): def test_issue_33_34_utf16_encoded_file():
if PY26: if PY26:
pass pass
else: else:
check_mmap_encoding('utf-16') check_mmap_encoding("utf-16")
def test_issue_33_34_utf16be_encoded_file(): def test_issue_33_34_utf16be_encoded_file():
if PY26: if PY26:
pass pass
else: else:
check_mmap_encoding('utf-16-be') check_mmap_encoding("utf-16-be")
def test_issue_33_34_utf16le_encoded_file(): def test_issue_33_34_utf16le_encoded_file():
if PY26: if PY26:
pass pass
else: else:
check_mmap_encoding('utf-16-le') check_mmap_encoding("utf-16-le")
def test_issue_33_34_utf8_encoded_file(): def test_issue_33_34_utf8_encoded_file():
if PY26: if PY26:
pass pass
else: else:
check_mmap_encoding('utf-8') check_mmap_encoding("utf-8")
def check_mmap_encoding(encoding): def check_mmap_encoding(encoding):
import mmap import mmap
content = [ content = [
[u'Äkkilähdöt', u'Matkakirjoituksia', u'Matkatoimistot'], [u"Äkkilähdöt", u"Matkakirjoituksia", u"Matkatoimistot"],
[u'Äkkilähdöt', u'Matkakirjoituksia', u'Matkatoimistot']] [u"Äkkilähdöt", u"Matkakirjoituksia", u"Matkatoimistot"],
]
test_file = "test-%s-encoding-in-mmap-file.csv" % encoding test_file = "test-%s-encoding-in-mmap-file.csv" % encoding
save_data(test_file, content, encoding=encoding) save_data(test_file, content, encoding=encoding)
with open(test_file, 'r+b') as f: with open(test_file, "r+b") as f:
memory_mapped_file = mmap.mmap( memory_mapped_file = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
f.fileno(), 0, access=mmap.ACCESS_READ) data = get_data(memory_mapped_file, file_type="csv", encoding=encoding)
data = get_data(memory_mapped_file, eq_(data["csv"], content)
file_type='csv', encoding=encoding)
eq_(data['csv'], content)
os.unlink(test_file) os.unlink(test_file)
def test_issue_35_encoding_for_file_content(): def test_issue_35_encoding_for_file_content():
encoding = 'utf-16' encoding = "utf-16"
content = [ content = [
[u'Äkkilähdöt', u'Matkakirjoituksia', u'Matkatoimistot'], [u"Äkkilähdöt", u"Matkakirjoituksia", u"Matkatoimistot"],
[u'Äkkilähdöt', u'Matkakirjoituksia', u'Matkatoimistot']] [u"Äkkilähdöt", u"Matkakirjoituksia", u"Matkatoimistot"],
]
test_file = "test-%s-encoding-in-mmap-file.csv" % encoding test_file = "test-%s-encoding-in-mmap-file.csv" % encoding
save_data(test_file, content, encoding=encoding) save_data(test_file, content, encoding=encoding)
with open(test_file, 'r+b') as f: with open(test_file, "r+b") as f:
csv_content = f.read() csv_content = f.read()
data = get_data(csv_content, file_type='csv', encoding=encoding) data = get_data(csv_content, file_type="csv", encoding=encoding)
eq_(data['csv'], content) eq_(data["csv"], content)
os.unlink(test_file) os.unlink(test_file)
def test_issue_43(): def test_issue_43():
#if not IN_TRAVIS: # if not IN_TRAVIS:
# raise SkipTest() # raise SkipTest()
p.get_book(url="https://github.com/pyexcel/pyexcel-xls/raw/master/tests/fixtures/file_with_an_empty_sheet.xls"); # flake8: noqa p.get_book(
url="https://github.com/pyexcel/pyexcel-xls/raw/master/tests/fixtures/file_with_an_empty_sheet.xls"
)
# flake8: noqa
def test_pyexcel_issue_138(): def test_pyexcel_issue_138():
array = [['123_122', '123_1.', '123_1.0']] array = [["123_122", "123_1.", "123_1.0"]]
save_data('test.csv', array) save_data("test.csv", array)
data = get_data('test.csv') data = get_data("test.csv")
expected = [['123_122', '123_1.', '123_1.0']] expected = [["123_122", "123_1.", "123_1.0"]]
eq_(data['test.csv'], expected) eq_(data["test.csv"], expected)
os.unlink('test.csv') os.unlink("test.csv")
def get_fixture(file_name): def get_fixture(file_name):

View File

@ -11,23 +11,15 @@ from pyexcel_io.writers.tsv import TSVBookWriter
class TestCSVReaders(TestCase): class TestCSVReaders(TestCase):
file_type = 'csv' file_type = "csv"
reader_class = CSVBookReader reader_class = CSVBookReader
delimiter = ',' delimiter = ","
def setUp(self): def setUp(self):
self.test_file = "csv_book." + self.file_type self.test_file = "csv_book." + self.file_type
self.data = [ self.data = [["1", "2", "3"], ["4", "5", "6"], ["7", "8", "9"]]
["1", "2", "3"], self.expected_data = [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
["4", "5", "6"], with open(self.test_file, "w") as f:
["7", "8", "9"]
]
self.expected_data = [
[1, 2, 3],
[4, 5, 6],
[7, 8, 9]
]
with open(self.test_file, 'w') as f:
for row in self.data: for row in self.data:
f.write(self.delimiter.join(row) + "\n") f.write(self.delimiter.join(row) + "\n")
@ -39,7 +31,7 @@ class TestCSVReaders(TestCase):
def test_book_reader_from_memory_source(self): def test_book_reader_from_memory_source(self):
io = manager.get_io(self.file_type) io = manager.get_io(self.file_type)
with open(self.test_file, 'r') as f: with open(self.test_file, "r") as f:
io.write(f.read()) io.write(f.read())
io.seek(0) io.seek(0)
b = self.reader_class() b = self.reader_class()
@ -54,27 +46,19 @@ class TestCSVReaders(TestCase):
class TestTSVReaders(TestCSVReaders): class TestTSVReaders(TestCSVReaders):
file_type = "tsv" file_type = "tsv"
reader_class = TSVBookReader reader_class = TSVBookReader
delimiter = '\t' delimiter = "\t"
class TestReadMultipleSheets(TestCase): class TestReadMultipleSheets(TestCase):
file_type = "csv" file_type = "csv"
reader_class = CSVBookReader reader_class = CSVBookReader
delimiter = ',' delimiter = ","
def setUp(self): def setUp(self):
self.test_file_formatter = "csv_multiple__%s__%s." + self.file_type self.test_file_formatter = "csv_multiple__%s__%s." + self.file_type
self.merged_book_file = "csv_multiple." + self.file_type self.merged_book_file = "csv_multiple." + self.file_type
self.data = [ self.data = [["1", "2", "3"], ["4", "5", "6"], ["7", "8", "9"]]
["1", "2", "3"], self.expected_data = [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
["4", "5", "6"],
["7", "8", "9"]
]
self.expected_data = [
[1, 2, 3],
[4, 5, 6],
[7, 8, 9]
]
self.sheets = OrderedDict() self.sheets = OrderedDict()
self.sheets.update({"sheet1": self.data}) self.sheets.update({"sheet1": self.data})
self.sheets.update({"sheet2": self.data}) self.sheets.update({"sheet2": self.data})
@ -86,7 +70,7 @@ class TestReadMultipleSheets(TestCase):
index = 0 index = 0
for key, value in self.sheets.items(): for key, value in self.sheets.items():
file_name = self.test_file_formatter % (key, index) file_name = self.test_file_formatter % (key, index)
with open(file_name, 'w') as f: with open(file_name, "w") as f:
for row in value: for row in value:
f.write(self.delimiter.join(row) + "\n") f.write(self.delimiter.join(row) + "\n")
index = index + 1 index = index + 1
@ -103,8 +87,9 @@ class TestReadMultipleSheets(TestCase):
b = self.reader_class() b = self.reader_class()
b.open(self.merged_book_file) b.open(self.merged_book_file)
sheets = b.read_sheet_by_name("sheet1") sheets = b.read_sheet_by_name("sheet1")
self.assertEqual(list(sheets["sheet1"]), self.assertEqual(
self.expected_sheets["sheet1"]) list(sheets["sheet1"]), self.expected_sheets["sheet1"]
)
@raises(ValueError) @raises(ValueError)
def test_read_one_from_many_by_non_existent_name(self): def test_read_one_from_many_by_non_existent_name(self):
@ -116,8 +101,9 @@ class TestReadMultipleSheets(TestCase):
b = self.reader_class() b = self.reader_class()
b.open(self.merged_book_file) b.open(self.merged_book_file)
sheets = b.read_sheet_by_index(1) sheets = b.read_sheet_by_index(1)
self.assertEqual(list(sheets["sheet2"]), self.assertEqual(
self.expected_sheets["sheet2"]) list(sheets["sheet2"]), self.expected_sheets["sheet2"]
)
@raises(IndexError) @raises(IndexError)
def test_read_one_from_many_by_wrong_index(self): def test_read_one_from_many_by_wrong_index(self):
@ -134,31 +120,38 @@ class TestReadMultipleSheets(TestCase):
class TestTSVBookReaders(TestReadMultipleSheets): class TestTSVBookReaders(TestReadMultipleSheets):
file_type = 'tsv' file_type = "tsv"
reader_class = TSVBookReader reader_class = TSVBookReader
delimiter = '\t' delimiter = "\t"
class TestWriteMultipleSheets(TestCase): class TestWriteMultipleSheets(TestCase):
file_type = "csv" file_type = "csv"
writer_class = CSVBookWriter writer_class = CSVBookWriter
reader_class = CSVBookReader reader_class = CSVBookReader
result1 = dedent(""" result1 = dedent(
"""
1,2,3 1,2,3
4,5,6 4,5,6
7,8,9 7,8,9
""").strip('\n') """
result2 = dedent(""" ).strip("\n")
result2 = dedent(
"""
1,2,3 1,2,3
4,5,6 4,5,6
7,8,1000 7,8,1000
""").strip('\n') """
result3 = dedent(""" ).strip("\n")
result3 = dedent(
"""
1,2,3 1,2,3
4,5,6888 4,5,6888
7,8,9 7,8,9
""").strip('\n') """
merged = dedent("""\ ).strip("\n")
merged = dedent(
"""\
---pyexcel:sheet1--- ---pyexcel:sheet1---
1,2,3 1,2,3
4,5,6 4,5,6
@ -174,41 +167,18 @@ class TestWriteMultipleSheets(TestCase):
4,5,6888 4,5,6888
7,8,9 7,8,9
---pyexcel--- ---pyexcel---
""") """
)
def setUp(self): def setUp(self):
self.test_file_formatter = "csv_multiple__%s__%s." + self.file_type self.test_file_formatter = "csv_multiple__%s__%s." + self.file_type
self.merged_book_file = "csv_multiple." + self.file_type self.merged_book_file = "csv_multiple." + self.file_type
self.data1 = [ self.data1 = [["1", "2", "3"], ["4", "5", "6"], ["7", "8", "9"]]
["1", "2", "3"], self.data2 = [["1", "2", "3"], ["4", "5", "6"], ["7", "8", "1000"]]
["4", "5", "6"], self.data3 = [["1", "2", "3"], ["4", "5", "6888"], ["7", "8", "9"]]
["7", "8", "9"] self.expected_data1 = [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
] self.expected_data2 = [[1, 2, 3], [4, 5, 6], [7, 8, 1000]]
self.data2 = [ self.expected_data3 = [[1, 2, 3], [4, 5, 6888], [7, 8, 9]]
["1", "2", "3"],
["4", "5", "6"],
["7", "8", "1000"]
]
self.data3 = [
["1", "2", "3"],
["4", "5", "6888"],
["7", "8", "9"]
]
self.expected_data1 = [
[1, 2, 3],
[4, 5, 6],
[7, 8, 9]
]
self.expected_data2 = [
[1, 2, 3],
[4, 5, 6],
[7, 8, 1000]
]
self.expected_data3 = [
[1, 2, 3],
[4, 5, 6888],
[7, 8, 9]
]
self.sheets = OrderedDict() self.sheets = OrderedDict()
self.sheets.update({"sheet1": self.data1}) self.sheets.update({"sheet1": self.data1})
self.sheets.update({"sheet2": self.data2}) self.sheets.update({"sheet2": self.data2})
@ -230,16 +200,16 @@ class TestWriteMultipleSheets(TestCase):
index = 0 index = 0
for key, value in self.sheets.items(): for key, value in self.sheets.items():
file_name = self.test_file_formatter % (key, index) file_name = self.test_file_formatter % (key, index)
with open(file_name, 'r') as f: with open(file_name, "r") as f:
content = f.read().replace('\r', '') content = f.read().replace("\r", "")
assert content.strip('\n') == self.result_dict[key] assert content.strip("\n") == self.result_dict[key]
index = index + 1 index = index + 1
self.delete_files() self.delete_files()
def test_multiple_sheet_into_memory(self): def test_multiple_sheet_into_memory(self):
io = manager.get_io(self.file_type) io = manager.get_io(self.file_type)
w = self.writer_class() w = self.writer_class()
w.open(io, lineterminator='\n') w.open(io, lineterminator="\n")
w.write(self.sheets) w.write(self.sheets)
w.close() w.close()
content = io.getvalue() content = io.getvalue()
@ -249,11 +219,11 @@ class TestWriteMultipleSheets(TestCase):
"""Write csv book into a single stream""" """Write csv book into a single stream"""
io = manager.get_io(self.file_type) io = manager.get_io(self.file_type)
w = self.writer_class() w = self.writer_class()
w.open(io, lineterminator='\n') w.open(io, lineterminator="\n")
w.write(self.sheets) w.write(self.sheets)
w.close() w.close()
reader = self.reader_class() reader = self.reader_class()
reader.open_stream(io, lineterminator='\n', multiple_sheets=True) reader.open_stream(io, lineterminator="\n", multiple_sheets=True)
sheets = reader.read_all() sheets = reader.read_all()
for sheet in sheets: for sheet in sheets:
sheets[sheet] = list(sheets[sheet]) sheets[sheet] = list(sheets[sheet])
@ -268,25 +238,32 @@ class TestWriteMultipleSheets(TestCase):
class TestTSVWriteMultipleSheets(TestWriteMultipleSheets): class TestTSVWriteMultipleSheets(TestWriteMultipleSheets):
file_type = 'tsv' file_type = "tsv"
writer_class = TSVBookWriter writer_class = TSVBookWriter
reader_class = TSVBookReader reader_class = TSVBookReader
result1 = dedent(""" result1 = dedent(
"""
1\t2\t3 1\t2\t3
4\t5\t6 4\t5\t6
7\t8\t9 7\t8\t9
""").strip('\n') """
result2 = dedent(""" ).strip("\n")
result2 = dedent(
"""
1\t2\t3 1\t2\t3
4\t5\t6 4\t5\t6
7\t8\t1000 7\t8\t1000
""").strip('\n') """
result3 = dedent(""" ).strip("\n")
result3 = dedent(
"""
1\t2\t3 1\t2\t3
4\t5\t6888 4\t5\t6888
7\t8\t9 7\t8\t9
""").strip('\n') """
merged = dedent("""\ ).strip("\n")
merged = dedent(
"""\
---pyexcel:sheet1--- ---pyexcel:sheet1---
1\t2\t3 1\t2\t3
4\t5\t6 4\t5\t6
@ -302,65 +279,64 @@ class TestTSVWriteMultipleSheets(TestWriteMultipleSheets):
4\t5\t6888 4\t5\t6888
7\t8\t9 7\t8\t9
---pyexcel--- ---pyexcel---
""") """
)
class TestWriter(TestCase): class TestWriter(TestCase):
file_type = 'csv' file_type = "csv"
writer_class = CSVBookWriter writer_class = CSVBookWriter
result = dedent(""" result = dedent(
"""
1,2,3 1,2,3
4,5,6 4,5,6
7,8,9 7,8,9
""").strip('\n') """
).strip("\n")
def setUp(self): def setUp(self):
self.test_file = "csv_book." + self.file_type self.test_file = "csv_book." + self.file_type
self.data = [ self.data = [["1", "2", "3"], ["4", "5", "6"], ["7", "8", "9"]]
["1", "2", "3"],
["4", "5", "6"],
["7", "8", "9"]
]
def test_book_writer(self): def test_book_writer(self):
w = self.writer_class() w = self.writer_class()
w.open(self.test_file) w.open(self.test_file)
w.write({None: self.data}) w.write({None: self.data})
w.close() w.close()
with open(self.test_file, 'r') as f: with open(self.test_file, "r") as f:
content = f.read().replace('\r', '') content = f.read().replace("\r", "")
self.assertEqual(content.strip('\n'), self.result) self.assertEqual(content.strip("\n"), self.result)
def tearDown(self): def tearDown(self):
os.unlink(self.test_file) os.unlink(self.test_file)
class TestTSVWriters(TestWriter): class TestTSVWriters(TestWriter):
file_type = 'tsv' file_type = "tsv"
writer_class = TSVBookWriter writer_class = TSVBookWriter
result = dedent(""" result = dedent(
"""
1\t2\t3 1\t2\t3
4\t5\t6 4\t5\t6
7\t8\t9 7\t8\t9
""").strip('\n') """
).strip("\n")
class TestMemoryWriter(TestCase): class TestMemoryWriter(TestCase):
file_type = "csv" file_type = "csv"
writer_class = CSVBookWriter writer_class = CSVBookWriter
result = dedent(""" result = dedent(
"""
1,2,3 1,2,3
4,5,6 4,5,6
7,8,9 7,8,9
""").strip('\n') """
).strip("\n")
def setUp(self): def setUp(self):
self.test_file = "csv_book." + self.file_type self.test_file = "csv_book." + self.file_type
self.data = [ self.data = [["1", "2", "3"], ["4", "5", "6"], ["7", "8", "9"]]
["1", "2", "3"],
["4", "5", "6"],
["7", "8", "9"]
]
def test_book_writer_to_memroy(self): def test_book_writer_to_memroy(self):
io = manager.get_io(self.file_type) io = manager.get_io(self.file_type)
@ -368,15 +344,17 @@ class TestMemoryWriter(TestCase):
w.open(io, single_sheet_in_book=True) w.open(io, single_sheet_in_book=True)
w.write({self.file_type: self.data}) w.write({self.file_type: self.data})
w.close() w.close()
content = io.getvalue().replace('\r', '') content = io.getvalue().replace("\r", "")
assert content.strip('\n') == self.result assert content.strip("\n") == self.result
class TestTSVMemoryWriter(TestMemoryWriter): class TestTSVMemoryWriter(TestMemoryWriter):
file_type = 'tsv' file_type = "tsv"
writer_class = TSVBookWriter writer_class = TSVBookWriter
result = dedent(""" result = dedent(
"""
1\t2\t3 1\t2\t3
4\t5\t6 4\t5\t6
7\t8\t9 7\t8\t9
""").strip('\n') """
).strip("\n")

View File

@ -11,11 +11,12 @@ from pyexcel_io.writers.tsvz import TSVZipBookWriter
import zipfile import zipfile
from nose.tools import raises from nose.tools import raises
import sys import sys
PY2 = sys.version_info[0] == 2 PY2 = sys.version_info[0] == 2
class TestCSVZ(TestCase): class TestCSVZ(TestCase):
file_type = 'csvz' file_type = "csvz"
writer_class = CSVZipBookWriter writer_class = CSVZipBookWriter
reader_class = CSVZipBookReader reader_class = CSVZipBookReader
result = u"中,文,1,2,3" result = u"中,文,1,2,3"
@ -24,23 +25,21 @@ class TestCSVZ(TestCase):
self.file = "csvz." + self.file_type self.file = "csvz." + self.file_type
def test_writing(self): def test_writing(self):
data = [[u'', u'', 1, 2, 3]] data = [[u"", u"", 1, 2, 3]]
file_name = 'pyexcel_sheet1.' + self.file_type[0:3] file_name = "pyexcel_sheet1." + self.file_type[0:3]
zipbook = self.writer_class() zipbook = self.writer_class()
zipbook.open(self.file) zipbook.open(self.file)
zipbook.write({None: data}) zipbook.write({None: data})
zipbook.close() zipbook.close()
zip = zipfile.ZipFile(self.file, 'r') zip = zipfile.ZipFile(self.file, "r")
self.assertEqual(zip.namelist(), [file_name]) self.assertEqual(zip.namelist(), [file_name])
content = zip.read(file_name) content = zip.read(file_name)
content = content.decode('utf-8') content = content.decode("utf-8")
self.assertEqual( self.assertEqual(content.replace("\r", "").strip("\n"), self.result)
content.replace('\r', '').strip('\n'),
self.result)
zip.close() zip.close()
def test_reading(self): def test_reading(self):
data = [[u'', u'', 1, 2, 3]] data = [[u"", u"", 1, 2, 3]]
zipbook = self.writer_class() zipbook = self.writer_class()
zipbook.open(self.file) zipbook.open(self.file)
zipbook.write({None: data}) zipbook.write({None: data})
@ -48,8 +47,7 @@ class TestCSVZ(TestCase):
zipreader = self.reader_class() zipreader = self.reader_class()
zipreader.open(self.file) zipreader.open(self.file)
data = zipreader.read_all() data = zipreader.read_all()
self.assertEqual( self.assertEqual(list(data["pyexcel_sheet1"]), [[u"", u"", 1, 2, 3]])
list(data['pyexcel_sheet1']), [[u'', u'', 1, 2, 3]])
zipreader.close() zipreader.close()
def tearDown(self): def tearDown(self):
@ -57,7 +55,7 @@ class TestCSVZ(TestCase):
class TestTSVZ(TestCSVZ): class TestTSVZ(TestCSVZ):
file_type = 'tsvz' file_type = "tsvz"
writer_class = TSVZipBookWriter writer_class = TSVZipBookWriter
reader_class = TSVZipBookReader reader_class = TSVZipBookReader
result = u"\t\t1\t2\t3" result = u"\t\t1\t2\t3"
@ -73,7 +71,7 @@ def test_reading_from_memory():
zipreader = CSVZipBookReader() zipreader = CSVZipBookReader()
zipreader.open_stream(io) zipreader.open_stream(io)
data = zipreader.read_all() data = zipreader.read_all()
assert list(data['pyexcel_sheet1']) == [[1, 2, 3]] assert list(data["pyexcel_sheet1"]) == [[1, 2, 3]]
def test_reading_from_memory_tsvz(): def test_reading_from_memory_tsvz():
@ -86,7 +84,7 @@ def test_reading_from_memory_tsvz():
zipreader = TSVZipBookReader() zipreader = TSVZipBookReader()
zipreader.open_stream(io) zipreader.open_stream(io)
data = zipreader.read_all() data = zipreader.read_all()
assert list(data['pyexcel_sheet1']) == [[1, 2, 3]] assert list(data["pyexcel_sheet1"]) == [[1, 2, 3]]
class TestMultipleSheet(TestCase): class TestMultipleSheet(TestCase):
@ -95,41 +93,24 @@ class TestMultipleSheet(TestCase):
def setUp(self): def setUp(self):
self.content = OrderedDict() self.content = OrderedDict()
self.content.update({ self.content.update(
'Sheet 1': {"Sheet 1": [[1.0, 2.0, 3.0], [4.0, 5.0, 6.0], [7.0, 8.0, 9.0]]}
[ )
[1.0, 2.0, 3.0], self.content.update(
[4.0, 5.0, 6.0], {"Sheet 2": [["X", "Y", "Z"], [1.0, 2.0, 3.0], [4.0, 5.0, 6.0]]}
[7.0, 8.0, 9.0] )
] self.content.update(
}) {"Sheet 3": [["O", "P", "Q"], [3.0, 2.0, 1.0], [4.0, 3.0, 2.0]]}
self.content.update({ )
'Sheet 2':
[
['X', 'Y', 'Z'],
[1.0, 2.0, 3.0],
[4.0, 5.0, 6.0]
]
})
self.content.update({
'Sheet 3':
[
['O', 'P', 'Q'],
[3.0, 2.0, 1.0],
[4.0, 3.0, 2.0]
]
})
save_data(self.file_name, self.content) save_data(self.file_name, self.content)
def test_read_one_from_many_by_name(self): def test_read_one_from_many_by_name(self):
reader = self.reader_class() reader = self.reader_class()
reader.open(self.file_name) reader.open(self.file_name)
sheets = reader.read_sheet_by_name("Sheet 1") sheets = reader.read_sheet_by_name("Sheet 1")
self.assertEqual(list(sheets['Sheet 1']), [ self.assertEqual(
[1, 2, 3], list(sheets["Sheet 1"]), [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
[4, 5, 6], )
[7, 8, 9]
])
@raises(ValueError) @raises(ValueError)
def test_read_one_from_many_by_unknown_name(self): def test_read_one_from_many_by_unknown_name(self):
@ -141,11 +122,9 @@ class TestMultipleSheet(TestCase):
reader = self.reader_class() reader = self.reader_class()
reader.open(self.file_name) reader.open(self.file_name)
sheets = reader.read_sheet_by_index(0) sheets = reader.read_sheet_by_index(0)
self.assertEqual(list(sheets['Sheet 1']), [ self.assertEqual(
[1, 2, 3], list(sheets["Sheet 1"]), [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
[4, 5, 6], )
[7, 8, 9]
])
@raises(IndexError) @raises(IndexError)
def test_read_one_from_many_by_unknown_index(self): def test_read_one_from_many_by_unknown_index(self):

View File

@ -10,12 +10,14 @@ from pyexcel_io._compact import text_type
class TestDateFormat(TestCase): class TestDateFormat(TestCase):
def setUp(self): def setUp(self):
self.excel_filename = "testdateformat.csv" self.excel_filename = "testdateformat.csv"
self.data = [[ self.data = [
'01/06/2016', [
datetime.date(2014, 12, 25), "01/06/2016",
datetime.datetime(2014, 12, 25, 11, 11, 11), datetime.date(2014, 12, 25),
datetime.datetime(2014, 12, 25, 11, 11, 11, 10) datetime.datetime(2014, 12, 25, 11, 11, 11),
]] datetime.datetime(2014, 12, 25, 11, 11, 11, 10),
]
]
pe.save_as(dest_file_name=self.excel_filename, array=self.data) pe.save_as(dest_file_name=self.excel_filename, array=self.data)
def test_auto_detect_float(self): def test_auto_detect_float(self):
@ -23,13 +25,17 @@ class TestDateFormat(TestCase):
self.assertEqual(sheet.to_array(), self.data) self.assertEqual(sheet.to_array(), self.data)
def test_auto_detect_float_false(self): def test_auto_detect_float_false(self):
expected = [[ expected = [
'01/06/2016', [
'2014-12-25', "01/06/2016",
'2014-12-25 11:11:11', "2014-12-25",
'2014-12-25 11:11:11.000010']] "2014-12-25 11:11:11",
sheet = pe.get_sheet(file_name=self.excel_filename, "2014-12-25 11:11:11.000010",
auto_detect_datetime=False) ]
]
sheet = pe.get_sheet(
file_name=self.excel_filename, auto_detect_datetime=False
)
self.assertEqual(sheet.to_array(), expected) self.assertEqual(sheet.to_array(), expected)
def tearDown(self): def tearDown(self):
@ -44,38 +50,46 @@ class TestAutoDetectInt(TestCase):
def test_auto_detect_int(self): def test_auto_detect_int(self):
sheet = pe.get_sheet(file_name=self.test_file) sheet = pe.get_sheet(file_name=self.test_file)
expected = dedent(""" expected = dedent(
"""
test_auto_detect_init.csv: test_auto_detect_init.csv:
+---+---+-----+ +---+---+-----+
| 1 | 2 | 3.1 | | 1 | 2 | 3.1 |
+---+---+-----+""").strip() +---+---+-----+"""
).strip()
self.assertEqual(str(sheet), expected) self.assertEqual(str(sheet), expected)
def test_get_book_auto_detect_int(self): def test_get_book_auto_detect_int(self):
book = pe.get_book(file_name=self.test_file) book = pe.get_book(file_name=self.test_file)
expected = dedent(""" expected = dedent(
"""
test_auto_detect_init.csv: test_auto_detect_init.csv:
+---+---+-----+ +---+---+-----+
| 1 | 2 | 3.1 | | 1 | 2 | 3.1 |
+---+---+-----+""").strip() +---+---+-----+"""
).strip()
self.assertEqual(str(book), expected) self.assertEqual(str(book), expected)
def test_auto_detect_int_false(self): def test_auto_detect_int_false(self):
sheet = pe.get_sheet(file_name=self.test_file, auto_detect_int=False) sheet = pe.get_sheet(file_name=self.test_file, auto_detect_int=False)
expected = dedent(""" expected = dedent(
"""
test_auto_detect_init.csv: test_auto_detect_init.csv:
+-----+-----+-----+ +-----+-----+-----+
| 1.0 | 2.0 | 3.1 | | 1.0 | 2.0 | 3.1 |
+-----+-----+-----+""").strip() +-----+-----+-----+"""
).strip()
self.assertEqual(str(sheet), expected) self.assertEqual(str(sheet), expected)
def test_get_book_auto_detect_int_false(self): def test_get_book_auto_detect_int_false(self):
book = pe.get_book(file_name=self.test_file, auto_detect_int=False) book = pe.get_book(file_name=self.test_file, auto_detect_int=False)
expected = dedent(""" expected = dedent(
"""
test_auto_detect_init.csv: test_auto_detect_init.csv:
+-----+-----+-----+ +-----+-----+-----+
| 1.0 | 2.0 | 3.1 | | 1.0 | 2.0 | 3.1 |
+-----+-----+-----+""").strip() +-----+-----+-----+"""
).strip()
self.assertEqual(str(book), expected) self.assertEqual(str(book), expected)
def tearDown(self): def tearDown(self):
@ -96,22 +110,26 @@ class TestAutoDetectFloat(TestCase):
def test_auto_detect_float_false(self): def test_auto_detect_float_false(self):
sheet = pe.get_sheet(file_name=self.test_file, auto_detect_float=False) sheet = pe.get_sheet(file_name=self.test_file, auto_detect_float=False)
self.assertEqual(sheet.to_array(), [[1, '2.0', '3.1']]) self.assertEqual(sheet.to_array(), [[1, "2.0", "3.1"]])
expected = dedent(""" expected = dedent(
"""
test_auto_detect_init.csv: test_auto_detect_init.csv:
+---+-----+-----+ +---+-----+-----+
| 1 | 2.0 | 3.1 | | 1 | 2.0 | 3.1 |
+---+-----+-----+""").strip() +---+-----+-----+"""
).strip()
self.assertEqual(str(sheet), expected) self.assertEqual(str(sheet), expected)
def test_get_book_auto_detect_float_false(self): def test_get_book_auto_detect_float_false(self):
book = pe.get_book(file_name=self.test_file, auto_detect_float=False) book = pe.get_book(file_name=self.test_file, auto_detect_float=False)
self.assertEqual(book[0].to_array(), [[1, '2.0', '3.1']]) self.assertEqual(book[0].to_array(), [[1, "2.0", "3.1"]])
expected = dedent(""" expected = dedent(
"""
test_auto_detect_init.csv: test_auto_detect_init.csv:
+---+-----+-----+ +---+-----+-----+
| 1 | 2.0 | 3.1 | | 1 | 2.0 | 3.1 |
+---+-----+-----+""").strip() +---+-----+-----+"""
).strip()
self.assertEqual(str(book), expected) self.assertEqual(str(book), expected)
def tearDown(self): def tearDown(self):
@ -126,24 +144,23 @@ class TestSpecialStrings(TestCase):
""" """
def setUp(self): def setUp(self):
self.content = [['01', 1, 2.0, 3.1, 'NaN', 'nan']] self.content = [["01", 1, 2.0, 3.1, "NaN", "nan"]]
self.test_file = "test_auto_detect_init.csv" self.test_file = "test_auto_detect_init.csv"
pe.save_as(array=self.content, dest_file_name=self.test_file) pe.save_as(array=self.content, dest_file_name=self.test_file)
def test_auto_detect_float_true(self): def test_auto_detect_float_true(self):
sheet = pe.get_sheet(file_name=self.test_file) sheet = pe.get_sheet(file_name=self.test_file)
self.assertEqual(sheet.to_array(), self.assertEqual(sheet.to_array(), [["01", 1, 2, 3.1, "NaN", "nan"]])
[['01', 1, 2, 3.1, 'NaN', 'nan']])
def test_auto_detect_float_false(self): def test_auto_detect_float_false(self):
sheet = pe.get_sheet(file_name=self.test_file, auto_detect_float=False) sheet = pe.get_sheet(file_name=self.test_file, auto_detect_float=False)
self.assertEqual(sheet.to_array(), self.assertEqual(
[['01', 1, '2.0', '3.1', 'NaN', 'nan']]) sheet.to_array(), [["01", 1, "2.0", "3.1", "NaN", "nan"]]
)
def test_auto_detect_float_ignore_nan_text(self): def test_auto_detect_float_ignore_nan_text(self):
sheet = pe.get_sheet(file_name=self.test_file, ignore_nan_text=True) sheet = pe.get_sheet(file_name=self.test_file, ignore_nan_text=True)
self.assertEqual(sheet.to_array(), self.assertEqual(sheet.to_array(), [["01", 1, 2.0, 3.1, "NaN", "nan"]])
[['01', 1, 2.0, 3.1, 'NaN', 'nan']])
def test_auto_detect_float_default_float_nan(self): def test_auto_detect_float_default_float_nan(self):
sheet = pe.get_sheet(file_name=self.test_file, default_float_nan="nan") sheet = pe.get_sheet(file_name=self.test_file, default_float_nan="nan")

View File

@ -6,19 +6,17 @@ from pyexcel_io import get_data, save_data
class TestRenderer: class TestRenderer:
def setUp(self): def setUp(self):
self.test_file = "test_filter.csv" self.test_file = "test_filter.csv"
sample = [ sample = [[1, 21, 31], [2, 22, 32]]
[1, 21, 31],
[2, 22, 32]
]
save_data(self.test_file, sample) save_data(self.test_file, sample)
def test_filter_row(self): def test_filter_row(self):
def custom_row_renderer(row): def custom_row_renderer(row):
return [str(element) for element in row] return [str(element) for element in row]
custom_data = get_data(self.test_file,
row_renderer=custom_row_renderer) custom_data = get_data(
expected = [['1', '21', '31'], ['2', '22', '32']] self.test_file, row_renderer=custom_row_renderer
)
expected = [["1", "21", "31"], ["2", "22", "32"]]
eq_(custom_data[self.test_file], expected) eq_(custom_data[self.test_file], expected)
def tearDown(self): def tearDown(self):

View File

@ -24,7 +24,7 @@ def test_date_util_parse():
def test_issue_8_1(): def test_issue_8_1():
# https://github.com/pyexcel/pyexcel-ods3/issues/8 # https://github.com/pyexcel/pyexcel-ods3/issues/8
result = time_value('PT1111') result = time_value("PT1111")
eq_(result, None) eq_(result, None)
@ -50,42 +50,42 @@ def test_fake_date_time_20():
def test_issue_1_error(): def test_issue_1_error():
result = time_value('PT1111') result = time_value("PT1111")
eq_(result, None) eq_(result, None)
def test_detect_int_value(): def test_detect_int_value():
result = detect_int_value('123') result = detect_int_value("123")
eq_(result, 123) eq_(result, 123)
def test_detect_float_value(): def test_detect_float_value():
result = detect_float_value('123.1') result = detect_float_value("123.1")
eq_(result, 123.1) eq_(result, 123.1)
def test_suppression_of_pep_0515_int(): def test_suppression_of_pep_0515_int():
result = detect_int_value('123_123') result = detect_int_value("123_123")
eq_(result, None) eq_(result, None)
def test_suppression_of_pep_0515_float(): def test_suppression_of_pep_0515_float():
result = detect_float_value('123_123.') result = detect_float_value("123_123.")
eq_(result, None) eq_(result, None)
result = detect_float_value('123_123.1') result = detect_float_value("123_123.1")
eq_(result, None) eq_(result, None)
def test_detect_float_value_on_nan(): def test_detect_float_value_on_nan():
result = detect_float_value('NaN', ignore_nan_text=True) result = detect_float_value("NaN", ignore_nan_text=True)
eq_(result, None) eq_(result, None)
def test_detect_float_value_on_custom_nan_text(): def test_detect_float_value_on_custom_nan_text():
result = detect_float_value('NaN', default_float_nan="nan") result = detect_float_value("NaN", default_float_nan="nan")
eq_(result, None) eq_(result, None)
def test_detect_float_value_on_custom_nan_text2(): def test_detect_float_value_on_custom_nan_text2():
result = detect_float_value('nan', default_float_nan="nan") result = detect_float_value("nan", default_float_nan="nan")
eq_(str(result), "nan") eq_(str(result), "nan")

View File

@ -4,13 +4,11 @@ import pyexcel_io.constants as constants
class MyWriter(SheetWriter): class MyWriter(SheetWriter):
def set_size(self, size): def set_size(self, size):
self.native_book = size self.native_book = size
class MyReader(SheetReader): class MyReader(SheetReader):
def number_of_rows(self): def number_of_rows(self):
return len(self._native_sheet) return len(self._native_sheet)
@ -39,10 +37,7 @@ def take_second_column(current_index, start, limit=-1):
def test_custom_skip_row_func(): def test_custom_skip_row_func():
take_second_row = take_second_column take_second_row = take_second_column
array = [ array = [[1, 2, 3], [4, 5, 6]]
[1, 2, 3],
[4, 5, 6]
]
reader = MyReader(array, skip_row_func=take_second_row) reader = MyReader(array, skip_row_func=take_second_row)
actual = list(reader.to_array()) actual = list(reader.to_array())
expected = [[4, 5, 6]] expected = [[4, 5, 6]]
@ -51,15 +46,9 @@ def test_custom_skip_row_func():
def test_custom_skip_column_func(): def test_custom_skip_column_func():
array = [ array = [[1, 2, 3], [4, 5, 6]]
[1, 2, 3],
[4, 5, 6]
]
reader = MyReader(array, skip_column_func=take_second_column) reader = MyReader(array, skip_column_func=take_second_column)
actual = list(reader.to_array()) actual = list(reader.to_array())
expected = [ expected = [[2], [5]]
[2],
[5]
]
eq_(expected, actual) eq_(expected, actual)
reader.close() reader.close()

View File

@ -11,14 +11,17 @@ from pyexcel_io.database.common import (
SQLTableExporter, SQLTableExporter,
SQLTableExportAdapter, SQLTableExportAdapter,
SQLTableImporter, SQLTableImporter,
SQLTableImportAdapter) SQLTableImportAdapter,
)
from pyexcel_io.database.exporters.sqlalchemy import ( from pyexcel_io.database.exporters.sqlalchemy import (
SQLTableReader, SQLTableReader,
SQLBookReader) SQLBookReader,
)
from pyexcel_io.database.importers.sqlalchemy import ( from pyexcel_io.database.importers.sqlalchemy import (
PyexcelSQLSkipRowException, PyexcelSQLSkipRowException,
SQLTableWriter, SQLTableWriter,
SQLBookWriter) SQLBookWriter,
)
from pyexcel_io.database.querysets import QuerysetsReader from pyexcel_io.database.querysets import QuerysetsReader
from sqlalchemy.orm import relationship, backref from sqlalchemy.orm import relationship, backref
from nose.tools import raises, eq_ from nose.tools import raises, eq_
@ -30,7 +33,7 @@ PY36 = PY3 and sys.version_info[1] == 6
engine = None engine = None
if platform.python_implementation() == 'PyPy': if platform.python_implementation() == "PyPy":
engine = create_engine("sqlite:///tmp.db") engine = create_engine("sqlite:///tmp.db")
else: else:
engine = create_engine("sqlite://") engine = create_engine("sqlite://")
@ -39,7 +42,7 @@ Base = declarative_base()
class Pyexcel(Base): class Pyexcel(Base):
__tablename__ = 'pyexcel' __tablename__ = "pyexcel"
id = Column(Integer, primary_key=True) id = Column(Integer, primary_key=True)
name = Column(String, unique=True) name = Column(String, unique=True)
weight = Column(Float) weight = Column(Float)
@ -47,15 +50,16 @@ class Pyexcel(Base):
class Post(Base): class Post(Base):
__tablename__ = 'post' __tablename__ = "post"
id = Column(Integer, primary_key=True) id = Column(Integer, primary_key=True)
title = Column(String(80)) title = Column(String(80))
body = Column(String(100)) body = Column(String(100))
pub_date = Column(DateTime) pub_date = Column(DateTime)
category_id = Column(Integer, ForeignKey('category.id')) category_id = Column(Integer, ForeignKey("category.id"))
category = relationship('Category', category = relationship(
backref=backref('posts', lazy='dynamic')) "Category", backref=backref("posts", lazy="dynamic")
)
def __init__(self, title, body, category, pub_date=None): def __init__(self, title, body, category, pub_date=None):
self.title = title self.title = title
@ -66,11 +70,11 @@ class Post(Base):
self.category = category self.category = category
def __repr__(self): def __repr__(self):
return '<Post %r>' % self.title return "<Post %r>" % self.title
class Category(Base): class Category(Base):
__tablename__ = 'category' __tablename__ = "category"
id = Column(Integer, primary_key=True) id = Column(Integer, primary_key=True)
name = Column(String(50)) name = Column(String(50))
@ -78,7 +82,7 @@ class Category(Base):
self.name = name self.name = name
def __repr__(self): def __repr__(self):
return '<Category %r>' % self.name return "<Category %r>" % self.name
def __str__(self): def __str__(self):
return self.__repr__() return self.__repr__()
@ -91,16 +95,14 @@ class TestSingleRead:
def setUp(self): def setUp(self):
Base.metadata.drop_all(engine) Base.metadata.drop_all(engine)
Base.metadata.create_all(engine) Base.metadata.create_all(engine)
p1 = Pyexcel(id=0, p1 = Pyexcel(
name="Adam", id=0, name="Adam", weight=11.25, birth=datetime.date(2014, 11, 11)
weight=11.25, )
birth=datetime.date(2014, 11, 11))
self.session = Session() self.session = Session()
self.session.add(p1) self.session.add(p1)
p1 = Pyexcel(id=1, p1 = Pyexcel(
name="Smith", id=1, name="Smith", weight=12.25, birth=datetime.date(2014, 11, 12)
weight=12.25, )
birth=datetime.date(2014, 11, 12))
self.session.add(p1) self.session.add(p1)
self.session.commit() self.session.commit()
self.session.close() self.session.close()
@ -110,9 +112,9 @@ class TestSingleRead:
sheet = SQLTableReader(mysession, Pyexcel) sheet = SQLTableReader(mysession, Pyexcel)
data = sheet.to_array() data = sheet.to_array()
content = [ content = [
['birth', 'id', 'name', 'weight'], ["birth", "id", "name", "weight"],
['2014-11-11', 0, 'Adam', 11.25], ["2014-11-11", 0, "Adam", 11.25],
['2014-11-12', 1, 'Smith', 12.25] ["2014-11-12", 1, "Smith", 12.25],
] ]
# 'pyexcel' here is the table name # 'pyexcel' here is the table name
assert list(data) == content assert list(data) == content
@ -123,14 +125,16 @@ class TestSingleRead:
def custom_renderer(row): def custom_renderer(row):
return [str(element) for element in row] return [str(element) for element in row]
# the key for this test case # the key for this test case
sheet = SQLTableReader(mysession, Pyexcel, sheet = SQLTableReader(
row_renderer=custom_renderer) mysession, Pyexcel, row_renderer=custom_renderer
)
data = sheet.to_array() data = sheet.to_array()
content = [ content = [
['birth', 'id', 'name', 'weight'], ["birth", "id", "name", "weight"],
['2014-11-11', '0', 'Adam', '11.25'], ["2014-11-11", "0", "Adam", "11.25"],
['2014-11-12', '1', 'Smith', '12.25'] ["2014-11-12", "1", "Smith", "12.25"],
] ]
eq_(list(data), content) eq_(list(data), content)
mysession.close() mysession.close()
@ -140,8 +144,8 @@ class TestSingleRead:
sheet = SQLTableReader(mysession, Pyexcel, start_row=1) sheet = SQLTableReader(mysession, Pyexcel, start_row=1)
data = sheet.to_array() data = sheet.to_array()
content = [ content = [
['2014-11-11', 0, 'Adam', 11.25], ["2014-11-11", 0, "Adam", 11.25],
['2014-11-12', 1, 'Smith', 12.25] ["2014-11-12", 1, "Smith", 12.25],
] ]
# 'pyexcel'' here is the table name # 'pyexcel'' here is the table name
assert list(data) == content assert list(data) == content
@ -149,12 +153,9 @@ class TestSingleRead:
def test_sql_filter_1(self): def test_sql_filter_1(self):
mysession = Session() mysession = Session()
sheet = SQLTableReader(mysession, Pyexcel, sheet = SQLTableReader(mysession, Pyexcel, start_row=1, row_limit=1)
start_row=1, row_limit=1)
data = sheet.to_array() data = sheet.to_array()
content = [ content = [["2014-11-11", 0, "Adam", 11.25]]
['2014-11-11', 0, 'Adam', 11.25]
]
# 'pyexcel'' here is the table name # 'pyexcel'' here is the table name
assert list(data) == content assert list(data) == content
mysession.close() mysession.close()
@ -164,9 +165,9 @@ class TestSingleRead:
sheet = SQLTableReader(mysession, Pyexcel, start_column=1) sheet = SQLTableReader(mysession, Pyexcel, start_column=1)
data = sheet.to_array() data = sheet.to_array()
content = [ content = [
['id', 'name', 'weight'], ["id", "name", "weight"],
[0, 'Adam', 11.25], [0, "Adam", 11.25],
[1, 'Smith', 12.25] [1, "Smith", 12.25],
] ]
# 'pyexcel'' here is the table name # 'pyexcel'' here is the table name
assert list(data) == content assert list(data) == content
@ -174,14 +175,11 @@ class TestSingleRead:
def test_sql_filter_3(self): def test_sql_filter_3(self):
mysession = Session() mysession = Session()
sheet = SQLTableReader(mysession, Pyexcel, sheet = SQLTableReader(
start_column=1, column_limit=1) mysession, Pyexcel, start_column=1, column_limit=1
)
data = sheet.to_array() data = sheet.to_array()
content = [ content = [["id"], [0], [1]]
['id'],
[0],
[1]
]
# 'pyexcel'' here is the table name # 'pyexcel'' here is the table name
assert list(data) == content assert list(data) == content
mysession.close() mysession.close()
@ -192,14 +190,14 @@ class TestSingleWrite:
Base.metadata.drop_all(engine) Base.metadata.drop_all(engine)
Base.metadata.create_all(engine) Base.metadata.create_all(engine)
self.data = [ self.data = [
['birth', 'id', 'name', 'weight'], ["birth", "id", "name", "weight"],
[datetime.date(2014, 11, 11), 0, 'Adam', 11.25], [datetime.date(2014, 11, 11), 0, "Adam", 11.25],
[datetime.date(2014, 11, 12), 1, 'Smith', 12.25] [datetime.date(2014, 11, 12), 1, "Smith", 12.25],
] ]
self.results = [ self.results = [
['birth', 'id', 'name', 'weight'], ["birth", "id", "name", "weight"],
['2014-11-11', 0, 'Adam', 11.25], ["2014-11-11", 0, "Adam", 11.25],
['2014-11-12', 1, 'Smith', 12.25] ["2014-11-12", 1, "Smith", 12.25],
] ]
def test_one_table(self): def test_one_table(self):
@ -230,23 +228,24 @@ class TestSingleWrite:
assert list(results) == self.results assert list(results) == self.results
# update data using custom initializer # update data using custom initializer
update_data = [ update_data = [
['birth', 'id', 'name', 'weight'], ["birth", "id", "name", "weight"],
[datetime.date(2014, 11, 11), 0, 'Adam_E', 12.25], [datetime.date(2014, 11, 11), 0, "Adam_E", 12.25],
[datetime.date(2014, 11, 12), 1, 'Smith_E', 11.25] [datetime.date(2014, 11, 12), 1, "Smith_E", 11.25],
] ]
updated_results = [ updated_results = [
['birth', 'id', 'name', 'weight'], ["birth", "id", "name", "weight"],
['2014-11-11', 0, 'Adam_E', 12.25], ["2014-11-11", 0, "Adam_E", 12.25],
['2014-11-12', 1, 'Smith_E', 11.25] ["2014-11-12", 1, "Smith_E", 11.25],
] ]
def row_updater(row): def row_updater(row):
an_instance = mysession.query(Pyexcel).get(row['id']) an_instance = mysession.query(Pyexcel).get(row["id"])
if an_instance is None: if an_instance is None:
an_instance = Pyexcel() an_instance = Pyexcel()
for name in row.keys(): for name in row.keys():
setattr(an_instance, name, row[name]) setattr(an_instance, name, row[name])
return an_instance return an_instance
importer = SQLTableImporter(mysession) importer = SQLTableImporter(mysession)
adapter = SQLTableImportAdapter(Pyexcel) adapter = SQLTableImportAdapter(Pyexcel)
adapter.column_names = update_data[0] adapter.column_names = update_data[0]
@ -273,15 +272,16 @@ class TestSingleWrite:
assert list(results) == self.results assert list(results) == self.results
# update data using custom initializer # update data using custom initializer
update_data = [ update_data = [
['birth', 'id', 'name', 'weight'], ["birth", "id", "name", "weight"],
[datetime.date(2014, 11, 11), 0, 'Adam_E', 12.25], [datetime.date(2014, 11, 11), 0, "Adam_E", 12.25],
[datetime.date(2014, 11, 12), 1, 'Smith_E', 11.25] [datetime.date(2014, 11, 12), 1, "Smith_E", 11.25],
] ]
def row_updater(row): def row_updater(row):
an_instance = mysession.query(Pyexcel).get(row['id']) an_instance = mysession.query(Pyexcel).get(row["id"])
if an_instance is not None: if an_instance is not None:
raise PyexcelSQLSkipRowException() raise PyexcelSQLSkipRowException()
importer = SQLTableImporter(mysession) importer = SQLTableImporter(mysession)
adapter = SQLTableImportAdapter(Pyexcel) adapter = SQLTableImportAdapter(Pyexcel)
adapter.column_names = update_data[0] adapter.column_names = update_data[0]
@ -297,10 +297,10 @@ class TestSingleWrite:
def test_one_table_with_empty_rows(self): def test_one_table_with_empty_rows(self):
mysession = Session() mysession = Session()
data = [ data = [
['birth', 'id', 'name', 'weight'], ["birth", "id", "name", "weight"],
['', '', ''], ["", "", ""],
[datetime.date(2014, 11, 11), 0, 'Adam', 11.25], [datetime.date(2014, 11, 11), 0, "Adam", 11.25],
[datetime.date(2014, 11, 12), 1, 'Smith', 12.25] [datetime.date(2014, 11, 12), 1, "Smith", 12.25],
] ]
importer = SQLTableImporter(mysession) importer = SQLTableImporter(mysession)
adapter = SQLTableImportAdapter(Pyexcel) adapter = SQLTableImportAdapter(Pyexcel)
@ -316,9 +316,9 @@ class TestSingleWrite:
def test_one_table_with_empty_string_in_unique_field(self): def test_one_table_with_empty_string_in_unique_field(self):
mysession = Session() mysession = Session()
data = [ data = [
['birth', 'id', 'name', 'weight'], ["birth", "id", "name", "weight"],
[datetime.date(2014, 11, 11), 0, '', 11.25], [datetime.date(2014, 11, 11), 0, "", 11.25],
[datetime.date(2014, 11, 12), 1, '', 12.25] [datetime.date(2014, 11, 12), 1, "", 12.25],
] ]
importer = SQLTableImporter(mysession) importer = SQLTableImporter(mysession)
adapter = SQLTableImportAdapter(Pyexcel) adapter = SQLTableImportAdapter(Pyexcel)
@ -328,19 +328,21 @@ class TestSingleWrite:
writer.close() writer.close()
query_sets = mysession.query(Pyexcel).all() query_sets = mysession.query(Pyexcel).all()
results = QuerysetsReader(query_sets, data[0]).to_array() results = QuerysetsReader(query_sets, data[0]).to_array()
assert list(results) == [['birth', 'id', 'name', 'weight'], assert list(results) == [
['2014-11-11', 0, None, 11.25], ["birth", "id", "name", "weight"],
['2014-11-12', 1, None, 12.25]] ["2014-11-11", 0, None, 11.25],
["2014-11-12", 1, None, 12.25],
]
mysession.close() mysession.close()
def test_one_table_using_mapdict_as_array(self): def test_one_table_using_mapdict_as_array(self):
mysession = Session() mysession = Session()
self.data = [ self.data = [
["Birth Date", "Id", "Name", "Weight"], ["Birth Date", "Id", "Name", "Weight"],
[datetime.date(2014, 11, 11), 0, 'Adam', 11.25], [datetime.date(2014, 11, 11), 0, "Adam", 11.25],
[datetime.date(2014, 11, 12), 1, 'Smith', 12.25] [datetime.date(2014, 11, 12), 1, "Smith", 12.25],
] ]
mapdict = ['birth', 'id', 'name', 'weight'] mapdict = ["birth", "id", "name", "weight"]
importer = SQLTableImporter(mysession) importer = SQLTableImporter(mysession)
adapter = SQLTableImportAdapter(Pyexcel) adapter = SQLTableImportAdapter(Pyexcel)
@ -358,14 +360,14 @@ class TestSingleWrite:
mysession = Session() mysession = Session()
self.data = [ self.data = [
["Birth Date", "Id", "Name", "Weight"], ["Birth Date", "Id", "Name", "Weight"],
[datetime.date(2014, 11, 11), 0, 'Adam', 11.25], [datetime.date(2014, 11, 11), 0, "Adam", 11.25],
[datetime.date(2014, 11, 12), 1, 'Smith', 12.25] [datetime.date(2014, 11, 12), 1, "Smith", 12.25],
] ]
mapdict = { mapdict = {
"Birth Date": 'birth', "Birth Date": "birth",
"Id": 'id', "Id": "id",
"Name": 'name', "Name": "name",
"Weight": 'weight' "Weight": "weight",
} }
importer = SQLTableImporter(mysession) importer = SQLTableImporter(mysession)
@ -377,8 +379,8 @@ class TestSingleWrite:
writer.close() writer.close()
query_sets = mysession.query(Pyexcel).all() query_sets = mysession.query(Pyexcel).all()
results = QuerysetsReader( results = QuerysetsReader(
query_sets, query_sets, ["birth", "id", "name", "weight"]
['birth', 'id', 'name', 'weight']).to_array() ).to_array()
assert list(results) == self.results assert list(results) == self.results
mysession.close() mysession.close()
@ -389,44 +391,54 @@ class TestMultipleRead:
Base.metadata.create_all(engine) Base.metadata.create_all(engine)
self.session = Session() self.session = Session()
data = { data = {
"Category": [ "Category": [["id", "name"], [1, "News"], [2, "Sports"]],
["id", "name"],
[1, "News"],
[2, "Sports"]
],
"Post": [ "Post": [
["id", "title", "body", "pub_date", "category"], ["id", "title", "body", "pub_date", "category"],
[1, "Title A", "formal", [
datetime.datetime(2015, 1, 20, 23, 28, 29), "News"], 1,
[2, "Title B", "informal", "Title A",
datetime.datetime(2015, 1, 20, 23, 28, 30), "Sports"] "formal",
] datetime.datetime(2015, 1, 20, 23, 28, 29),
"News",
],
[
2,
"Title B",
"informal",
datetime.datetime(2015, 1, 20, 23, 28, 30),
"Sports",
],
],
} }
def category_init_func(row): def category_init_func(row):
c = Category(row['name']) c = Category(row["name"])
c.id = row['id'] c.id = row["id"]
return c return c
def post_init_func(row): def post_init_func(row):
c = self.session.query(Category).filter_by( c = (
name=row['category']).first() self.session.query(Category)
p = Post(row['title'], row['body'], c, row['pub_date']) .filter_by(name=row["category"])
.first()
)
p = Post(row["title"], row["body"], c, row["pub_date"])
return p return p
importer = SQLTableImporter(self.session) importer = SQLTableImporter(self.session)
category_adapter = SQLTableImportAdapter(Category) category_adapter = SQLTableImportAdapter(Category)
category_adapter.column_names = data['Category'][0] category_adapter.column_names = data["Category"][0]
category_adapter.row_initializer = category_init_func category_adapter.row_initializer = category_init_func
importer.append(category_adapter) importer.append(category_adapter)
post_adapter = SQLTableImportAdapter(Post) post_adapter = SQLTableImportAdapter(Post)
post_adapter.column_names = data['Post'][0] post_adapter.column_names = data["Post"][0]
post_adapter.row_initializer = post_init_func post_adapter.row_initializer = post_init_func
importer.append(post_adapter) importer.append(post_adapter)
writer = SQLBookWriter() writer = SQLBookWriter()
writer.open_content(importer) writer.open_content(importer)
to_store = OrderedDict() to_store = OrderedDict()
to_store.update({category_adapter.get_name(): data['Category'][1:]}) to_store.update({category_adapter.get_name(): data["Category"][1:]})
to_store.update({post_adapter.get_name(): data['Post'][1:]}) to_store.update({post_adapter.get_name(): data["Post"][1:]})
writer.write(to_store) writer.write(to_store)
writer.close() writer.close()
@ -442,18 +454,21 @@ class TestMultipleRead:
for key in data.keys(): for key in data.keys():
data[key] = list(data[key]) data[key] = list(data[key])
assert json.dumps(data) == ( assert json.dumps(data) == (
'{"category": [["id", "name"], [1, "News"], [2, "Sports"]], ' + '{"category": [["id", "name"], [1, "News"], [2, "Sports"]], '
'"post": [["body", "category_id", "id", "pub_date", "title"], ' + + '"post": [["body", "category_id", "id", "pub_date", "title"], '
'["formal", 1, 1, "2015-01-20T23:28:29", "Title A"], ' + + '["formal", 1, 1, "2015-01-20T23:28:29", "Title A"], '
'["informal", 2, 2, "2015-01-20T23:28:30", "Title B"]]}') + '["informal", 2, 2, "2015-01-20T23:28:30", "Title B"]]}'
)
def test_foreign_key(self): def test_foreign_key(self):
all_posts = self.session.query(Post).all() all_posts = self.session.query(Post).all()
column_names = ['category__name', 'title'] column_names = ["category__name", "title"]
data = list(QuerysetsReader(all_posts, column_names).to_array()) data = list(QuerysetsReader(all_posts, column_names).to_array())
eq_(json.dumps(data), eq_(
'[["category__name", "title"], ["News", "Title A"],' + json.dumps(data),
' ["Sports", "Title B"]]') '[["category__name", "title"], ["News", "Title A"],'
+ ' ["Sports", "Title B"]]',
)
def tearDown(self): def tearDown(self):
self.session.close() self.session.close()
@ -479,14 +494,14 @@ class TestNoAutoCommit:
Base.metadata.drop_all(engine) Base.metadata.drop_all(engine)
Base.metadata.create_all(engine) Base.metadata.create_all(engine)
self.data = [ self.data = [
['birth', 'id', 'name', 'weight'], ["birth", "id", "name", "weight"],
[datetime.date(2014, 11, 11), 0, 'Adam', 11.25], [datetime.date(2014, 11, 11), 0, "Adam", 11.25],
[datetime.date(2014, 11, 12), 1, 'Smith', 12.25] [datetime.date(2014, 11, 12), 1, "Smith", 12.25],
] ]
self.results = [ self.results = [
['birth', 'id', 'name', 'weight'], ["birth", "id", "name", "weight"],
['2014-11-11', 0, 'Adam', 11.25], ["2014-11-11", 0, "Adam", 11.25],
['2014-11-12', 1, 'Smith', 12.25] ["2014-11-12", 1, "Smith", 12.25],
] ]
def test_one_table(self): def test_one_table(self):
@ -498,8 +513,7 @@ class TestNoAutoCommit:
importer = SQLTableImporter(mysession) importer = SQLTableImporter(mysession)
adapter = SQLTableImportAdapter(Pyexcel) adapter = SQLTableImportAdapter(Pyexcel)
adapter.column_names = self.data[0] adapter.column_names = self.data[0]
writer = SQLTableWriter(importer, adapter, writer = SQLTableWriter(importer, adapter, auto_commit=False)
auto_commit=False)
writer.write_array(self.data[1:]) writer.write_array(self.data[1:])
writer.close() writer.close()
mysession.close() mysession.close()
@ -523,7 +537,7 @@ def test_not_implemented_method_2():
def test_sql_table_import_adapter(): def test_sql_table_import_adapter():
adapter = SQLTableImportAdapter(Pyexcel) adapter = SQLTableImportAdapter(Pyexcel)
adapter.column_names = ['a'] adapter.column_names = ["a"]
adapter.row_initializer = "abc" adapter.row_initializer = "abc"
eq_(adapter.row_initializer, "abc") eq_(adapter.row_initializer, "abc")
@ -532,10 +546,10 @@ def test_sql_table_import_adapter():
def test_unknown_sheet(self): def test_unknown_sheet(self):
importer = SQLTableImporter(None) importer = SQLTableImporter(None)
category_adapter = SQLTableImportAdapter(Category) category_adapter = SQLTableImportAdapter(Category)
category_adapter.column_names = [''] category_adapter.column_names = [""]
importer.append(category_adapter) importer.append(category_adapter)
writer = SQLBookWriter() writer = SQLBookWriter()
writer.open_content(importer) writer.open_content(importer)
to_store = OrderedDict() to_store = OrderedDict()
to_store.update({'you do not see me': [[]]}) to_store.update({"you do not see me": [[]]})
writer.write(to_store) writer.write(to_store)