🔨 apply black on python files

This commit is contained in:
chfw 2018-08-23 19:08:20 +01:00
parent 574b892316
commit bef818481a
25 changed files with 606 additions and 688 deletions

View File

@ -18,7 +18,9 @@ import pyexcel_io.plugins as plugins
BLACK_LIST = [__name__, "pyexcel_webio", "pyexcel_text"]
WHITE_LIST = [
"pyexcel_io.readers", "pyexcel_io.writers", "pyexcel_io.database"
"pyexcel_io.readers",
"pyexcel_io.writers",
"pyexcel_io.database",
]
PREFIX = "pyexcel_"

View File

@ -33,7 +33,6 @@ try:
except ImportError:
class NullHandler(logging.Handler):
def emit(self, record):
pass
@ -46,7 +45,6 @@ if PY2:
irange = xrange
class Iterator(object):
def next(self):
return type(self).__next__(self)

View File

@ -16,6 +16,7 @@ class RWInterface(object):
"""
The common methods for book reader and writer
"""
stream_type = None
def __init__(self):

View File

@ -16,11 +16,21 @@ MESSAGE_INVALID_PARAMETERS = "Invalid parameters"
MESSAGE_ERROR_02 = "No content, file name. Nothing is given"
MESSAGE_ERROR_03 = "cannot handle unknown content"
MESSAGE_WRONG_IO_INSTANCE = "Wrong io instance is passed for your file format."
MESSAGE_CANNOT_WRITE_STREAM_FORMATTER = "Cannot write content of file type %s to stream"
MESSAGE_CANNOT_READ_STREAM_FORMATTER = "Cannot read content of file type %s from stream"
MESSAGE_CANNOT_WRITE_FILE_TYPE_FORMATTER = "Cannot write content of file type %s to file %s"
MESSAGE_CANNOT_READ_FILE_TYPE_FORMATTER = "Cannot read content of file type %s from file %s"
MESSAGE_LOADING_FORMATTER = "The plugin for file type %s is not installed. Please install %s"
MESSAGE_CANNOT_WRITE_STREAM_FORMATTER = (
"Cannot write content of file type %s to stream"
)
MESSAGE_CANNOT_READ_STREAM_FORMATTER = (
"Cannot read content of file type %s from stream"
)
MESSAGE_CANNOT_WRITE_FILE_TYPE_FORMATTER = (
"Cannot write content of file type %s to file %s"
)
MESSAGE_CANNOT_READ_FILE_TYPE_FORMATTER = (
"Cannot read content of file type %s from file %s"
)
MESSAGE_LOADING_FORMATTER = (
"The plugin for file type %s is not installed. Please install %s"
)
MESSAGE_EMPTY_ARRAY = "One empty row is found"
MESSAGE_IGNORE_ROW = "One row is ignored"
MESSAGE_DB_EXCEPTION = """
@ -47,5 +57,5 @@ STOP_ITERATION = 1
DEFAULT_MULTI_CSV_SEPARATOR = "__"
SEPARATOR_FORMATTER = "---%s---" % DEFAULT_NAME + "%s"
SEPARATOR_MATCHER = "---%s:(.*)---" % DEFAULT_NAME
DEFAULT_CSV_STREAM_FILE_FORMATTER = ("---%s:" % DEFAULT_NAME + "%s---%s")
DEFAULT_CSV_STREAM_FILE_FORMATTER = "---%s:" % DEFAULT_NAME + "%s---%s"
DEFAULT_CSV_NEWLINE = "\r\n"

View File

@ -81,8 +81,7 @@ class DjangoBookWriter(BookWriter):
)
else:
raise Exception(
"Sheet: %s does not match any given models."
% sheet_name
"Sheet: %s does not match any given models." % sheet_name
+ "Please be aware of case sensitivity."
)

View File

@ -18,6 +18,7 @@ class PyexcelSQLSkipRowException(Exception):
Raised this exception to skipping a row
while data import
"""
pass
@ -93,8 +94,7 @@ class SQLBookWriter(BookWriter):
)
else:
raise Exception(
"Sheet: %s does not match any given tables."
% sheet_name
"Sheet: %s does not match any given tables." % sheet_name
+ "Please be aware of case sensitivity."
)

View File

@ -11,14 +11,17 @@
class NoSupportingPluginFound(Exception):
"""raised when an known file extension is seen"""
pass
class SupportingPluginAvailableButNotInstalled(Exception):
"""raised when a known plugin is not installed"""
pass
class UpgradePlugin(Exception):
"""raised when a known plugin is not compatible"""
pass

View File

@ -180,8 +180,7 @@ def load_data(
try:
file_type = file_name.split(".")[-1]
except AttributeError:
raise Exception(
"file_name should be a string type")
raise Exception("file_name should be a string type")
reader = READERS.get_a_plugin(file_type, library)
if file_name:

View File

@ -98,7 +98,9 @@ class IOManager(PluginManager):
message = "Please install "
if len(plugins) > 1:
message += ERROR_MESSAGE_FORMATTER % (
self.action, file_type, ",".join(plugins)
self.action,
file_type,
",".join(plugins),
)
else:
message += plugins[0]

View File

@ -84,7 +84,7 @@ class CSVMemoryMapIterator(compact.Iterator):
if bom_header == BOM_BIG_ENDIAN:
self.__endian = BIG_ENDIAN
elif self.__endian == LITTLE_ENDIAN:
line = line[self.__zeros_left_in_2_row:]
line = line[self.__zeros_left_in_2_row :]
if self.__endian == LITTLE_ENDIAN:
line = line.rstrip()
line = line.decode(self.__encoding)
@ -168,14 +168,14 @@ class CSVSheetReader(SheetReader):
ret = service.detect_int_value(csv_cell_text, self.__pep_0515_off)
if ret is None and self.__auto_detect_float:
ret = service.detect_float_value(
csv_cell_text, self.__pep_0515_off,
csv_cell_text,
self.__pep_0515_off,
ignore_nan_text=self.__ignore_nan_text,
default_float_nan=self.__default_float_nan
default_float_nan=self.__default_float_nan,
)
shall_we_ignore_the_conversion = (
(ret in [float("inf"), float("-inf")])
and self.__ignore_infinity
)
ret in [float("inf"), float("-inf")]
) and self.__ignore_infinity
if shall_we_ignore_the_conversion:
ret = None
if ret is None and self.__auto_detect_datetime:

View File

@ -40,9 +40,8 @@ def detect_date_value(cell_text):
def detect_float_value(
cell_text, pep_0515_off=True,
ignore_nan_text=False,
default_float_nan=None):
cell_text, pep_0515_off=True, ignore_nan_text=False, default_float_nan=None
):
should_we_skip_it = (
cell_text.startswith("0") and cell_text.startswith("0.") is False
)

1
tests/fixtures/force_file_type.txt vendored Normal file
View File

@ -0,0 +1 @@
1,2,3

View File

@ -1,6 +1,4 @@
from pyexcel_io.sheet import (
SheetReader, SheetWriter, NamedContent
)
from pyexcel_io.sheet import SheetReader, SheetWriter, NamedContent
from pyexcel_io.book import BookWriter
from pyexcel_io.utils import is_empty_array
from nose.tools import raises
@ -28,7 +26,6 @@ class ArrayWriter(SheetWriter):
class TestSheetReader:
@raises(Exception)
def test_abstractness(self):
reader = SheetReader("test")
@ -54,13 +51,13 @@ class TestSheetReader:
def to_array(self):
pass
b = B(name)
b.to_array()
assert b.name == name
class TestSheetWriter:
@raises(NotImplementedError)
def test_abstractness(self):
writer = SheetWriter("te", "st", "abstract")
@ -71,16 +68,12 @@ class TestSheetWriter:
def write_row(self, row):
pass
d = D('t', 'e', 's')
d = D("t", "e", "s")
d.write_row([11, 11])
def test_writer(self):
native_sheet = NamedContent("test", [])
content = [
[1, 2],
[3, 4],
[5, 6]
]
content = [[1, 2], [3, 4], [5, 6]]
writer = ArrayWriter(None, native_sheet, "test")
writer.write_row(content[0])
writer.write_array(content[1:])
@ -88,11 +81,7 @@ class TestSheetWriter:
def test_writer2(self):
native_sheet = NamedContent("test", [])
content = [
[1, 2],
[3, 4],
[5, 6]
]
content = [[1, 2], [3, 4], [5, 6]]
writer = ArrayWriter(None, native_sheet, None)
writer.write_row(content[0])
writer.write_array(content[1:])

View File

@ -10,12 +10,9 @@ from pyexcel_io.sheet import NamedContent
from pyexcel_io.readers.csvr import (
CSVSheetReader,
CSVFileReader,
CSVinMemoryReader
)
from pyexcel_io.writers.csvw import (
CSVFileWriter,
CSVMemoryWriter
CSVinMemoryReader,
)
from pyexcel_io.writers.csvw import CSVFileWriter, CSVMemoryWriter
from pyexcel_io._compact import BytesIO, PY2, StringIO
@ -23,17 +20,9 @@ class TestReaders(TestCase):
def setUp(self):
self.file_type = "csv"
self.test_file = "csv_book." + self.file_type
self.data = [
["1", "2", "3"],
["4", "5", "6"],
["7", "8", "9"]
]
self.expected_data = [
[1, 2, 3],
[4, 5, 6],
[7, 8, 9]
]
with open(self.test_file, 'w') as f:
self.data = [["1", "2", "3"], ["4", "5", "6"], ["7", "8", "9"]]
self.expected_data = [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
with open(self.test_file, "w") as f:
for row in self.data:
f.write(",".join(row) + "\n")
@ -49,7 +38,7 @@ class TestReaders(TestCase):
def test_sheet_memory_reader(self):
io = manager.get_io(self.file_type)
with open(self.test_file, 'r') as f:
with open(self.test_file, "r") as f:
io.write(f.read())
io.seek(0)
r = CSVinMemoryReader(NamedContent(self.file_type, io))
@ -64,25 +53,23 @@ class TestWriter(TestCase):
def setUp(self):
self.file_type = "csv"
self.test_file = "csv_book." + self.file_type
self.data = [
[1, 2, 3],
[4, 5, 6],
[7, 8, 9]
]
self.result = dedent("""
self.data = [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
self.result = dedent(
"""
1,2,3
4,5,6
7,8,9
""").strip('\n')
"""
).strip("\n")
def test_sheet_writer(self):
w = CSVFileWriter(self.test_file, None)
for row in self.data:
w.write_row(row)
w.close()
with open(self.test_file, 'r') as f:
content = f.read().replace('\r', '')
self.assertEqual(content.strip('\n'), self.result)
with open(self.test_file, "r") as f:
content = f.read().replace("\r", "")
self.assertEqual(content.strip("\n"), self.result)
def tearDown(self):
os.unlink(self.test_file)
@ -92,16 +79,14 @@ class TestMemoryWriter(TestCase):
def setUp(self):
self.file_type = "csv"
self.test_file = "csv_book." + self.file_type
self.data = [
[1, 2, 3],
[4, 5, 6],
[7, 8, 9]
]
self.result = dedent("""
self.data = [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
self.result = dedent(
"""
1,2,3
4,5,6
7,8,9
""").strip('\n')
"""
).strip("\n")
def test_sheet_writer_to_memory(self):
io = manager.get_io(self.file_type)
@ -109,31 +94,23 @@ class TestMemoryWriter(TestCase):
for row in self.data:
w.write_row(row)
w.close()
content = io.getvalue().replace('\r', '')
self.assertEqual(content.strip('\n'), self.result)
content = io.getvalue().replace("\r", "")
self.assertEqual(content.strip("\n"), self.result)
class TestNonUniformCSV(TestCase):
def setUp(self):
self.file_type = "csv"
self.test_file = "csv_book." + self.file_type
self.data = [
["1"],
["4", "5", "6", "", ""],
["", "7"]
]
with open(self.test_file, 'w') as f:
self.data = [["1"], ["4", "5", "6", "", ""], ["", "7"]]
with open(self.test_file, "w") as f:
for row in self.data:
f.write(",".join(row) + "\n")
def test_sheet_file_reader(self):
r = CSVFileReader(NamedContent(self.file_type, self.test_file))
result = list(r.to_array())
self.assertEqual(result, [
[1],
[4, 5, 6],
["", 7]
])
self.assertEqual(result, [[1], [4, 5, 6], ["", 7]])
def tearDown(self):
os.unlink(self.test_file)
@ -141,53 +118,55 @@ class TestNonUniformCSV(TestCase):
def test_utf16_decoding():
test_file = os.path.join("tests", "fixtures", "csv-encoding-utf16.csv")
reader = CSVFileReader(
NamedContent('csv', test_file),
encoding="utf-16")
reader = CSVFileReader(NamedContent("csv", test_file), encoding="utf-16")
content = list(reader.to_array())
if PY2:
content[0] = [s.encode('utf-8') for s in content[0]]
expected = [['Äkkilähdöt', 'Matkakirjoituksia', 'Matkatoimistot']]
content[0] = [s.encode("utf-8") for s in content[0]]
expected = [["Äkkilähdöt", "Matkakirjoituksia", "Matkatoimistot"]]
eq_(content, expected)
def test_utf16_encoding():
content = [[u'Äkkilähdöt', u'Matkakirjoituksia', u'Matkatoimistot']]
content = [[u"Äkkilähdöt", u"Matkakirjoituksia", u"Matkatoimistot"]]
test_file = "test-utf16-encoding.csv"
writer = CSVFileWriter(
test_file, None,
encoding="utf-16", lineterminator="\n")
test_file, None, encoding="utf-16", lineterminator="\n"
)
writer.write_array(content)
writer.close()
with open(test_file, "rb") as f:
actual = f.read().decode('utf-16')
actual = f.read().decode("utf-16")
if PY2:
actual = actual.encode('utf-8')
eq_(actual, 'Äkkilähdöt,Matkakirjoituksia,Matkatoimistot\n')
actual = actual.encode("utf-8")
eq_(actual, "Äkkilähdöt,Matkakirjoituksia,Matkatoimistot\n")
os.unlink(test_file)
def test_utf16_memory_decoding():
test_content = u'Äkkilähdöt,Matkakirjoituksia,Matkatoimistot'
test_content = BytesIO(test_content.encode('utf-16'))
test_content = u"Äkkilähdöt,Matkakirjoituksia,Matkatoimistot"
test_content = BytesIO(test_content.encode("utf-16"))
reader = CSVinMemoryReader(
NamedContent('csv', test_content),
encoding="utf-16")
NamedContent("csv", test_content), encoding="utf-16"
)
content = list(reader.to_array())
if PY2:
content[0] = [s.encode('utf-8') for s in content[0]]
expected = [['Äkkilähdöt', 'Matkakirjoituksia', 'Matkatoimistot']]
content[0] = [s.encode("utf-8") for s in content[0]]
expected = [["Äkkilähdöt", "Matkakirjoituksia", "Matkatoimistot"]]
eq_(content, expected)
def test_utf16_memory_encoding():
content = [[u'Äkkilähdöt', u'Matkakirjoituksia', u'Matkatoimistot']]
content = [[u"Äkkilähdöt", u"Matkakirjoituksia", u"Matkatoimistot"]]
io = StringIO()
writer = CSVMemoryWriter(
io, None, lineterminator="\n", single_sheet_in_book=True,
encoding="utf-16")
io,
None,
lineterminator="\n",
single_sheet_in_book=True,
encoding="utf-16",
)
writer.write_array(content)
actual = io.getvalue()
if PY2:
actual = actual.decode('utf-16')
eq_(actual, u'Äkkilähdöt,Matkakirjoituksia,Matkatoimistot\n')
actual = actual.decode("utf-16")
eq_(actual, u"Äkkilähdöt,Matkakirjoituksia,Matkatoimistot\n")

View File

@ -6,15 +6,15 @@ from pyexcel_io.database.common import (
DjangoModelImporter,
DjangoModelImportAdapter,
DjangoModelExporter,
DjangoModelExportAdapter
DjangoModelExportAdapter,
)
from pyexcel_io.database.importers.django import (
DjangoModelWriter,
DjangoBookWriter
DjangoBookWriter,
)
from pyexcel_io.database.exporters.django import (
DjangoModelReader,
DjangoBookReader
DjangoBookReader,
)
@ -92,21 +92,13 @@ class FakeExceptionDjangoModel(FakeDjangoModel):
self.raiseException = raiseException
def __call__(self, **keywords):
return Package(raiseException=self.raiseException,
**keywords)
return Package(raiseException=self.raiseException, **keywords)
class TestException:
def setUp(self):
self.data = [
["X", "Y", "Z"],
[1, 2, 3],
[4, 5, 6]
]
self.result = [
{'Y': 2, 'X': 1, 'Z': 3},
{'Y': 5, 'X': 4, 'Z': 6}
]
self.data = [["X", "Y", "Z"], [1, 2, 3], [4, 5, 6]]
self.result = [{"Y": 2, "X": 1, "Z": 3}, {"Y": 5, "X": 4, "Z": 6}]
@raises(Exception)
def test_bulk_save_to_django_model_with_exception(self):
@ -129,15 +121,8 @@ class TestException:
class TestSheet:
def setUp(self):
self.data = [
["X", "Y", "Z"],
[1, 2, 3],
[4, 5, 6]
]
self.result = [
{'Y': 2, 'X': 1, 'Z': 3},
{'Y': 5, 'X': 4, 'Z': 6}
]
self.data = [["X", "Y", "Z"], [1, 2, 3], [4, 5, 6]]
self.result = [{"Y": 2, "X": 1, "Z": 3}, {"Y": 5, "X": 4, "Z": 6}]
def test_sheet_save_to_django_model(self):
model = FakeDjangoModel()
@ -150,12 +135,7 @@ class TestSheet:
def test_sheet_save_to_django_model_with_empty_array(self):
model = FakeDjangoModel()
data = [
["X", "Y", "Z"],
['', '', ''],
[1, 2, 3],
[4, 5, 6]
]
data = [["X", "Y", "Z"], ["", "", ""], [1, 2, 3], [4, 5, 6]]
adapter = DjangoModelImportAdapter(model)
adapter.column_names = self.data[0]
writer = DjangoModelWriter(None, adapter)
@ -169,6 +149,7 @@ class TestSheet:
def wrapper(row):
row[0] = row[0] + 1
return row
adapter = DjangoModelImportAdapter(model)
adapter.column_names = self.data[0]
adapter.row_initializer = wrapper
@ -176,8 +157,8 @@ class TestSheet:
writer.write_array(self.data[1:])
writer.close()
assert model.objects.objs == [
{'Y': 2, 'X': 2, 'Z': 3},
{'Y': 5, 'X': 5, 'Z': 6}
{"Y": 2, "X": 2, "Z": 3},
{"Y": 5, "X": 5, "Z": 6},
]
def test_sheet_save_to_django_model_skip_me(self):
@ -188,15 +169,14 @@ class TestSheet:
return None
else:
return row
adapter = DjangoModelImportAdapter(model)
adapter.column_names = self.data[0]
adapter.row_initializer = wrapper
writer = DjangoModelWriter(None, adapter)
writer.write_array(self.data[1:])
writer.close()
assert model.objects.objs == [
{'Y': 2, 'X': 1, 'Z': 3}
]
assert model.objects.objs == [{"Y": 2, "X": 1, "Z": 3}]
def test_load_sheet_from_django_model(self):
model = FakeDjangoModel()
@ -204,8 +184,9 @@ class TestSheet:
adapter = DjangoModelImportAdapter(model)
adapter.column_names = self.data[0]
importer.append(adapter)
save_data(importer, {adapter.get_name(): self.data[1:]},
file_type=DB_DJANGO)
save_data(
importer, {adapter.get_name(): self.data[1:]}, file_type=DB_DJANGO
)
assert model.objects.objs == self.result
model._meta.update(["X", "Y", "Z"])
reader = DjangoModelReader(model)
@ -218,30 +199,23 @@ class TestSheet:
adapter = DjangoModelImportAdapter(model)
adapter.column_names = self.data[0]
importer.append(adapter)
save_data(importer, {adapter.get_name(): self.data[1:]},
file_type=DB_DJANGO)
save_data(
importer, {adapter.get_name(): self.data[1:]}, file_type=DB_DJANGO
)
assert model.objects.objs == self.result
model._meta.update(["X", "Y", "Z"])
def row_renderer(row):
return [str(element) for element in row]
# the key point of this test case
reader = DjangoModelReader(model,
row_renderer=row_renderer)
reader = DjangoModelReader(model, row_renderer=row_renderer)
data = reader.to_array()
expected = [
["X", "Y", "Z"],
['1', '2', '3'],
['4', '5', '6']
]
expected = [["X", "Y", "Z"], ["1", "2", "3"], ["4", "5", "6"]]
eq_(list(data), expected)
def test_mapping_array(self):
data2 = [
["A", "B", "C"],
[1, 2, 3],
[4, 5, 6]
]
data2 = [["A", "B", "C"], [1, 2, 3], [4, 5, 6]]
model = FakeDjangoModel()
adapter = DjangoModelImportAdapter(model)
adapter.column_names = data2[0]
@ -252,16 +226,8 @@ class TestSheet:
eq_(model.objects.objs, self.result)
def test_mapping_dict(self):
data2 = [
["A", "B", "C"],
[1, 2, 3],
[4, 5, 6]
]
mapdict = {
"C": "Z",
"A": "X",
"B": "Y"
}
data2 = [["A", "B", "C"], [1, 2, 3], [4, 5, 6]]
mapdict = {"C": "Z", "A": "X", "B": "Y"}
model = FakeDjangoModel()
adapter = DjangoModelImportAdapter(model)
adapter.column_names = data2[0]
@ -281,16 +247,22 @@ class TestSheet:
class TestMultipleModels:
def setUp(self):
self.content = OrderedDict()
self.content.update({
"Sheet1": [[u'X', u'Y', u'Z'], [1, 4, 7], [2, 5, 8], [3, 6, 9]]})
self.content.update({
"Sheet2": [[u'A', u'B', u'C'], [1, 4, 7], [2, 5, 8], [3, 6, 9]]})
self.result1 = [{'Y': 4, 'X': 1, 'Z': 7},
{'Y': 5, 'X': 2, 'Z': 8},
{'Y': 6, 'X': 3, 'Z': 9}]
self.result2 = [{'B': 4, 'A': 1, 'C': 7},
{'B': 5, 'A': 2, 'C': 8},
{'B': 6, 'A': 3, 'C': 9}]
self.content.update(
{"Sheet1": [[u"X", u"Y", u"Z"], [1, 4, 7], [2, 5, 8], [3, 6, 9]]}
)
self.content.update(
{"Sheet2": [[u"A", u"B", u"C"], [1, 4, 7], [2, 5, 8], [3, 6, 9]]}
)
self.result1 = [
{"Y": 4, "X": 1, "Z": 7},
{"Y": 5, "X": 2, "Z": 8},
{"Y": 6, "X": 3, "Z": 9},
]
self.result2 = [
{"B": 4, "A": 1, "C": 7},
{"B": 5, "A": 2, "C": 8},
{"B": 6, "A": 3, "C": 9},
]
def test_save_to_more_models(self):
sample_size = 10
@ -298,14 +270,14 @@ class TestMultipleModels:
model2 = FakeDjangoModel()
importer = DjangoModelImporter()
adapter1 = DjangoModelImportAdapter(model1)
adapter1.column_names = self.content['Sheet1'][0]
adapter1.column_names = self.content["Sheet1"][0]
adapter2 = DjangoModelImportAdapter(model2)
adapter2.column_names = self.content['Sheet2'][0]
adapter2.column_names = self.content["Sheet2"][0]
importer.append(adapter1)
importer.append(adapter2)
to_store = {
adapter1.get_name(): self.content['Sheet1'][1:],
adapter2.get_name(): self.content['Sheet2'][1:]
adapter1.get_name(): self.content["Sheet1"][1:],
adapter2.get_name(): self.content["Sheet2"][1:],
}
writer = DjangoBookWriter()
writer.open_content(importer, batch_size=sample_size)
@ -321,14 +293,14 @@ class TestMultipleModels:
model2 = FakeDjangoModel()
importer = DjangoModelImporter()
adapter1 = DjangoModelImportAdapter(model1)
adapter1.column_names = self.content['Sheet1'][0]
adapter1.column_names = self.content["Sheet1"][0]
adapter2 = DjangoModelImportAdapter(model2)
adapter2.column_names = self.content['Sheet2'][0]
adapter2.column_names = self.content["Sheet2"][0]
importer.append(adapter1)
importer.append(adapter2)
to_store = {
adapter1.get_name(): self.content['Sheet1'][1:],
adapter2.get_name(): self.content['Sheet2'][1:]
adapter1.get_name(): self.content["Sheet1"][1:],
adapter2.get_name(): self.content["Sheet2"][1:],
}
writer = DjangoBookWriter()
writer.open_content(importer, batch_size=sample_size, bulk_save=False)
@ -341,14 +313,14 @@ class TestMultipleModels:
model2 = FakeDjangoModel()
importer = DjangoModelImporter()
adapter1 = DjangoModelImportAdapter(model1)
adapter1.column_names = self.content['Sheet1'][0]
adapter1.column_names = self.content["Sheet1"][0]
adapter2 = DjangoModelImportAdapter(model2)
adapter2.column_names = self.content['Sheet2'][0]
adapter2.column_names = self.content["Sheet2"][0]
importer.append(adapter1)
importer.append(adapter2)
to_store = {
adapter1.get_name(): self.content['Sheet1'][1:],
adapter2.get_name(): self.content['Sheet2'][1:]
adapter1.get_name(): self.content["Sheet1"][1:],
adapter2.get_name(): self.content["Sheet2"][1:],
}
save_data(importer, to_store, file_type=DB_DJANGO)
assert model1.objects.objs == self.result1
@ -374,11 +346,11 @@ class TestMultipleModels:
model1 = FakeDjangoModel()
importer = DjangoModelImporter()
adapter = DjangoModelImportAdapter(model1)
adapter.column_names = self.content['Sheet1'][0]
adapter.column_names = self.content["Sheet1"][0]
importer.append(adapter)
to_store = {
adapter.get_name(): self.content['Sheet1'][1:],
"Sheet2": self.content['Sheet2'][1:]
adapter.get_name(): self.content["Sheet1"][1:],
"Sheet2": self.content["Sheet2"][1:],
}
save_data(importer, to_store, file_type=DB_DJANGO)
assert model1.objects.objs == self.result1
@ -390,7 +362,7 @@ class TestMultipleModels:
reader = DjangoBookReader()
reader.open_content(exporter)
data = reader.read_all()
assert list(data['Sheet1']) == self.content['Sheet1']
assert list(data["Sheet1"]) == self.content["Sheet1"]
@raises(TypeError)
@ -407,29 +379,23 @@ def test_not_implemented_method_2():
class TestFilter:
def setUp(self):
self.data = [
["X", "Y", "Z"],
[1, 2, 3],
[4, 5, 6]
]
self.result = [
{'Y': 2, 'X': 1, 'Z': 3},
{'Y': 5, 'X': 4, 'Z': 6}
]
self.data = [["X", "Y", "Z"], [1, 2, 3], [4, 5, 6]]
self.result = [{"Y": 2, "X": 1, "Z": 3}, {"Y": 5, "X": 4, "Z": 6}]
self.model = FakeDjangoModel()
importer = DjangoModelImporter()
adapter = DjangoModelImportAdapter(self.model)
adapter.column_names = self.data[0]
importer.append(adapter)
save_data(importer, {adapter.get_name(): self.data[1:]},
file_type=DB_DJANGO)
save_data(
importer, {adapter.get_name(): self.data[1:]}, file_type=DB_DJANGO
)
assert self.model.objects.objs == self.result
self.model._meta.update(["X", "Y", "Z"])
def test_load_sheet_from_django_model_with_filter(self):
reader = DjangoModelReader(self.model, start_row=0, row_limit=2)
data = reader.to_array()
expected = [['X', 'Y', 'Z'], [1, 2, 3]]
expected = [["X", "Y", "Z"], [1, 2, 3]]
eq_(list(data), expected)
def test_load_sheet_from_django_model_with_filter_1(self):
@ -441,18 +407,18 @@ class TestFilter:
def test_load_sheet_from_django_model_with_filter_2(self):
reader = DjangoModelReader(self.model, start_column=1)
data = reader.to_array()
expected = [['Y', 'Z'], [2, 3], [5, 6]]
expected = [["Y", "Z"], [2, 3], [5, 6]]
eq_(list(data), expected)
def test_load_sheet_from_django_model_with_filter_3(self):
reader = DjangoModelReader(self.model, start_column=1, column_limit=1)
data = reader.to_array()
expected = [['Y'], [2], [5]]
expected = [["Y"], [2], [5]]
eq_(list(data), expected)
def test_django_model_import_adapter():
adapter = DjangoModelImportAdapter(FakeDjangoModel)
adapter.column_names = ['a']
adapter.column_names = ["a"]
adapter.row_initializer = "abc"
eq_(adapter.row_initializer, "abc")

View File

@ -7,17 +7,18 @@ import pyexcel_io.constants as constants
def test_index_filter():
current_index, start, limit, expected = (0, 1, -1,
constants.SKIP_DATA)
current_index, start, limit, expected = (0, 1, -1, constants.SKIP_DATA)
eq_(_index_filter(current_index, start, limit), expected)
current_index, start, limit, expected = (2, 1, -1,
constants.TAKE_DATA)
current_index, start, limit, expected = (2, 1, -1, constants.TAKE_DATA)
eq_(_index_filter(current_index, start, limit), expected)
current_index, start, limit, expected = (2, 1, 10,
constants.TAKE_DATA)
current_index, start, limit, expected = (2, 1, 10, constants.TAKE_DATA)
eq_(_index_filter(current_index, start, limit), expected)
current_index, start, limit, expected = (100, 1, 10,
constants.STOP_ITERATION)
current_index, start, limit, expected = (
100,
1,
10,
constants.STOP_ITERATION,
)
eq_(_index_filter(current_index, start, limit), expected)
@ -30,7 +31,7 @@ class TestFilter:
[3, 23, 33],
[4, 24, 34],
[5, 25, 35],
[6, 26, 36]
[6, 26, 36],
]
save_data(self.test_file, sample)
@ -46,26 +47,29 @@ class TestFilter:
def test_filter_column(self):
filtered_data = get_data(self.test_file, start_column=1)
expected = [[21, 31], [22, 32], [23, 33],
[24, 34], [25, 35], [26, 36]]
expected = [[21, 31], [22, 32], [23, 33], [24, 34], [25, 35], [26, 36]]
eq_(filtered_data[self.test_file], expected)
def test_filter_column_2(self):
filtered_data = get_data(self.test_file,
start_column=1, column_limit=1)
filtered_data = get_data(
self.test_file, start_column=1, column_limit=1
)
expected = [[21], [22], [23], [24], [25], [26]]
eq_(filtered_data[self.test_file], expected)
def test_filter_both_ways(self):
filtered_data = get_data(self.test_file,
start_column=1, start_row=3)
filtered_data = get_data(self.test_file, start_column=1, start_row=3)
expected = [[24, 34], [25, 35], [26, 36]]
eq_(filtered_data[self.test_file], expected)
def test_filter_both_ways_2(self):
filtered_data = get_data(self.test_file,
start_column=1, column_limit=1,
start_row=3, row_limit=1)
filtered_data = get_data(
self.test_file,
start_column=1,
column_limit=1,
start_row=3,
row_limit=1,
)
expected = [[24]]
eq_(filtered_data[self.test_file], expected)

View File

@ -18,8 +18,7 @@ PY2 = sys.version_info[0] == 2
def test_force_file_type():
test_file = "force_file_type.txt"
data = get_data(
os.path.join("tests", "fixtures", test_file),
force_file_type="csv"
os.path.join("tests", "fixtures", test_file), force_file_type="csv"
)
expected = [[1, 2, 3]]
eq_(expected, data[test_file])
@ -75,7 +74,7 @@ def test_load_ods_data_from_memory():
def test_write_xlsx_data_to_memory():
data = {'Sheet': [[1]]}
data = {"Sheet": [[1]]}
io = BytesIO()
msg = "Please install one of these plugins for write data in 'xlsx': "
msg += "pyexcel-xlsx,pyexcel-xlsxw"
@ -120,7 +119,7 @@ def test_writer_csvz_data_from_memory():
if not PY2:
io = StringIO()
writer = get_writer(io, file_type="csvz")
writer.write({'adb': [[2, 3]]})
writer.write({"adb": [[2, 3]]})
else:
raise Exception("pass it")
@ -135,7 +134,7 @@ def test_writer_xlsm_data_from_memory2():
def test_writer_unknown_data_from_memory2():
io = BytesIO()
# mock it
manager.register_stream_type('unknown1', 'text')
manager.register_stream_type("unknown1", "text")
get_writer(io, file_type="unknown1")
@ -148,115 +147,115 @@ def test_get_io_type():
t = manager.get_io_type("hello")
assert t is None
t = manager.get_io_type("csv")
eq_(t, 'string')
eq_(t, "string")
t = manager.get_io_type("xls")
eq_(t, 'bytes')
eq_(t, "bytes")
def test_default_csv_format():
data = [['1', '2', '3']]
data = [["1", "2", "3"]]
io = manager.get_io("csv")
# test default format for saving is 'csv'
save_data(io, data)
io.seek(0)
# test default format for reading is 'csv'
result = get_data(io)
assert result['csv'] == [[1, 2, 3]]
assert result["csv"] == [[1, 2, 3]]
def test_case_insentivity():
data = [['1', '2', '3']]
data = [["1", "2", "3"]]
io = manager.get_io("CSV")
# test default format for saving is 'csv'
save_data(io, data)
io.seek(0)
# test default format for reading is 'csv'
result = get_data(io)
assert result['csv'] == [[1, 2, 3]]
assert result["csv"] == [[1, 2, 3]]
def test_file_handle_as_input():
test_file = "file_handle.csv"
with open(test_file, 'w') as f:
with open(test_file, "w") as f:
f.write("1,2,3")
with open(test_file, 'r') as f:
data = get_data(f, 'csv')
eq_(data['csv'], [[1, 2, 3]])
with open(test_file, "r") as f:
data = get_data(f, "csv")
eq_(data["csv"], [[1, 2, 3]])
def test_file_type_case_insensitivity():
test_file = "file_handle.CSv"
with open(test_file, 'w') as f:
with open(test_file, "w") as f:
f.write("1,2,3")
with open(test_file, 'r') as f:
data = get_data(f, 'csv')
eq_(data['csv'], [[1, 2, 3]])
with open(test_file, "r") as f:
data = get_data(f, "csv")
eq_(data["csv"], [[1, 2, 3]])
def test_file_handle_as_output():
test_file = "file_handle.csv"
with open(test_file, 'w') as f:
save_data(f, [[1, 2, 3]], 'csv', lineterminator='\n')
with open(test_file, "w") as f:
save_data(f, [[1, 2, 3]], "csv", lineterminator="\n")
with open(test_file, 'r') as f:
with open(test_file, "r") as f:
content = f.read()
eq_(content, '1,2,3\n')
eq_(content, "1,2,3\n")
def test_binary_file_content():
data = [['1', '2', '3']]
data = [["1", "2", "3"]]
io = manager.get_io("csvz")
save_data(io, data, 'csvz')
result = get_data(io.getvalue(), 'csvz')
eq_(result['pyexcel_sheet1'], [[1, 2, 3]])
save_data(io, data, "csvz")
result = get_data(io.getvalue(), "csvz")
eq_(result["pyexcel_sheet1"], [[1, 2, 3]])
def test_text_file_content():
data = [['1', '2', '3']]
data = [["1", "2", "3"]]
io = manager.get_io("csv")
save_data(io, data, 'csv')
result = get_data(io.getvalue(), 'csv')
eq_(result['csv'], [[1, 2, 3]])
save_data(io, data, "csv")
result = get_data(io.getvalue(), "csv")
eq_(result["csv"], [[1, 2, 3]])
def test_library_parameter():
data = [['1', '2', '3']]
data = [["1", "2", "3"]]
io = manager.get_io("csv")
save_data(io, data, 'csv', library="pyexcel-io")
result = get_data(io.getvalue(), 'csv', library="pyexcel-io")
eq_(result['csv'], [[1, 2, 3]])
save_data(io, data, "csv", library="pyexcel-io")
result = get_data(io.getvalue(), "csv", library="pyexcel-io")
eq_(result["csv"], [[1, 2, 3]])
@raises(Exception)
def test_library_parameter_error_situation():
data = [['1', '2', '3']]
data = [["1", "2", "3"]]
io = manager.get_io("csv")
save_data(io, data, 'csv', library="doesnot-exist")
save_data(io, data, "csv", library="doesnot-exist")
def test_conversion_from_bytes_to_text():
test_file = "conversion.csv"
data = [['1', '2', '3']]
data = [["1", "2", "3"]]
save_data(test_file, data)
with open(test_file, "rb") as f:
content = f.read()
result = get_data(content, 'csv')
assert result['csv'] == [[1, 2, 3]]
result = get_data(content, "csv")
assert result["csv"] == [[1, 2, 3]]
os.unlink(test_file)
def test_is_string():
if PY2:
assert is_string(type(u'a')) is True
assert is_string(type(u"a")) is True
else:
assert is_string(type('a')) is True
assert is_string(type("a")) is True
def test_generator_is_obtained():
data, reader = iget_data(os.path.join("tests", "fixtures", "test.csv"))
assert isinstance(data['test.csv'], types.GeneratorType)
assert isinstance(data["test.csv"], types.GeneratorType)
reader.close()
@ -268,27 +267,19 @@ def test_generator_can_be_written():
assert os.path.exists(test_filename)
data2 = get_data(test_filename)
expected = get_data(test_fixture)
assert data2[test_filename] == expected['test.csv']
assert data2[test_filename] == expected["test.csv"]
os.unlink(test_filename)
class TestReadMultipleSheets(TestCase):
file_type = "csv"
delimiter = ','
delimiter = ","
def setUp(self):
self.test_file_formatter = "csv_multiple__%s__%s." + self.file_type
self.merged_book_file = "csv_multiple." + self.file_type
self.data = [
["1", "2", "3"],
["4", "5", "6"],
["7", "8", "9"]
]
self.expected_data = [
[1, 2, 3],
[4, 5, 6],
[7, 8, 9]
]
self.data = [["1", "2", "3"], ["4", "5", "6"], ["7", "8", "9"]]
self.expected_data = [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
self.sheets = OrderedDict()
self.sheets.update({"sheet1": self.data})
self.sheets.update({"sheet2": self.data})
@ -300,24 +291,24 @@ class TestReadMultipleSheets(TestCase):
index = 0
for key, value in self.sheets.items():
file_name = self.test_file_formatter % (key, index)
with open(file_name, 'w') as f:
with open(file_name, "w") as f:
for row in value:
f.write(self.delimiter.join(row) + "\n")
index = index + 1
def test_sheet_name(self):
sheets = get_data(self.merged_book_file, sheet_name="sheet1")
eq_(sheets['sheet1'], self.expected_sheets['sheet1'])
eq_(sheets["sheet1"], self.expected_sheets["sheet1"])
def test_sheet_index(self):
sheets = get_data(self.merged_book_file, sheet_index=1)
eq_(sheets['sheet2'], self.expected_sheets['sheet2'])
eq_(sheets["sheet2"], self.expected_sheets["sheet2"])
def test_read_many(self):
sheets = get_data(self.merged_book_file, sheets=['sheet1', 2])
eq_(sheets['sheet1'], self.expected_sheets['sheet1'])
eq_(sheets['sheet3'], self.expected_sheets['sheet3'])
assert 'sheet2' not in sheets
sheets = get_data(self.merged_book_file, sheets=["sheet1", 2])
eq_(sheets["sheet1"], self.expected_sheets["sheet1"])
eq_(sheets["sheet3"], self.expected_sheets["sheet3"])
assert "sheet2" not in sheets
def tearDown(self):
index = 0

View File

@ -8,18 +8,12 @@ from pyexcel_io import get_data, save_data
from pyexcel_io._compact import PY26
import pyexcel as p
IN_TRAVIS = 'TRAVIS' in os.environ
IN_TRAVIS = "TRAVIS" in os.environ
def test_issue_8():
test_file = "test_issue_8.csv"
data = [
[1, 2],
[],
[],
[],
[3, 4]
]
data = [[1, 2], [], [], [], [3, 4]]
save_data(test_file, data)
written_data = get_data(test_file, skip_empty_rows=False)
eq_(data, written_data[test_file])
@ -29,21 +23,21 @@ def test_issue_8():
def test_issue_20():
test_file = get_fixture("issue20.csv")
data = get_data(test_file)
expected = [[u'to', u'infinity', u'and', u'beyond']]
eq_(data['issue20.csv'], expected)
expected = [[u"to", u"infinity", u"and", u"beyond"]]
eq_(data["issue20.csv"], expected)
def test_issue_23():
test_file = get_fixture("issue23.csv")
data = get_data(test_file)
expected = [
[8204235414504252, u'inf'],
[82042354145042521, u'-inf'],
[8204235414504252, u"inf"],
[82042354145042521, u"-inf"],
[820423541450425216, 0],
[820423541450425247, 1],
[8204235414504252490, 1.1]
[8204235414504252490, 1.1],
]
eq_(data['issue23.csv'], expected)
eq_(data["issue23.csv"], expected)
# def test_issue_28():
@ -62,24 +56,26 @@ def test_issue_33_34():
pass
else:
import mmap
test_file = get_fixture("issue20.csv")
with open(test_file, 'r+b') as f:
with open(test_file, "r+b") as f:
memory_mapped_file = mmap.mmap(
f.fileno(), 0, access=mmap.ACCESS_READ)
data = get_data(memory_mapped_file, file_type='csv')
expected = [[u'to', u'infinity', u'and', u'beyond']]
eq_(data['csv'], expected)
f.fileno(), 0, access=mmap.ACCESS_READ
)
data = get_data(memory_mapped_file, file_type="csv")
expected = [[u"to", u"infinity", u"and", u"beyond"]]
eq_(data["csv"], expected)
def test_issue_30_utf8_BOM_header():
content = [[u'人有悲歡離合', u'月有陰晴圓缺']]
content = [[u"人有悲歡離合", u"月有陰晴圓缺"]]
test_file = "test-utf8-BOM.csv"
save_data(test_file, content, encoding="utf-8-sig", lineterminator="\n")
custom_encoded_content = get_data(test_file, encoding="utf-8-sig")
assert custom_encoded_content[test_file] == content
with open(test_file, "rb") as f:
content = f.read()
assert content[0:3] == b'\xef\xbb\xbf'
assert content[0:3] == b"\xef\xbb\xbf"
os.unlink(test_file)
@ -87,96 +83,100 @@ def test_issue_33_34_utf32_encoded_file():
if PY26:
pass
else:
check_mmap_encoding('utf-32')
check_mmap_encoding("utf-32")
def test_issue_33_34_utf32be_encoded_file():
if PY26:
pass
else:
check_mmap_encoding('utf-32-be')
check_mmap_encoding("utf-32-be")
def test_issue_33_34_utf32le_encoded_file():
if PY26:
pass
else:
check_mmap_encoding('utf-32-le')
check_mmap_encoding("utf-32-le")
def test_issue_33_34_utf16_encoded_file():
if PY26:
pass
else:
check_mmap_encoding('utf-16')
check_mmap_encoding("utf-16")
def test_issue_33_34_utf16be_encoded_file():
if PY26:
pass
else:
check_mmap_encoding('utf-16-be')
check_mmap_encoding("utf-16-be")
def test_issue_33_34_utf16le_encoded_file():
if PY26:
pass
else:
check_mmap_encoding('utf-16-le')
check_mmap_encoding("utf-16-le")
def test_issue_33_34_utf8_encoded_file():
if PY26:
pass
else:
check_mmap_encoding('utf-8')
check_mmap_encoding("utf-8")
def check_mmap_encoding(encoding):
import mmap
content = [
[u'Äkkilähdöt', u'Matkakirjoituksia', u'Matkatoimistot'],
[u'Äkkilähdöt', u'Matkakirjoituksia', u'Matkatoimistot']]
[u"Äkkilähdöt", u"Matkakirjoituksia", u"Matkatoimistot"],
[u"Äkkilähdöt", u"Matkakirjoituksia", u"Matkatoimistot"],
]
test_file = "test-%s-encoding-in-mmap-file.csv" % encoding
save_data(test_file, content, encoding=encoding)
with open(test_file, 'r+b') as f:
memory_mapped_file = mmap.mmap(
f.fileno(), 0, access=mmap.ACCESS_READ)
data = get_data(memory_mapped_file,
file_type='csv', encoding=encoding)
eq_(data['csv'], content)
with open(test_file, "r+b") as f:
memory_mapped_file = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
data = get_data(memory_mapped_file, file_type="csv", encoding=encoding)
eq_(data["csv"], content)
os.unlink(test_file)
def test_issue_35_encoding_for_file_content():
encoding = 'utf-16'
encoding = "utf-16"
content = [
[u'Äkkilähdöt', u'Matkakirjoituksia', u'Matkatoimistot'],
[u'Äkkilähdöt', u'Matkakirjoituksia', u'Matkatoimistot']]
[u"Äkkilähdöt", u"Matkakirjoituksia", u"Matkatoimistot"],
[u"Äkkilähdöt", u"Matkakirjoituksia", u"Matkatoimistot"],
]
test_file = "test-%s-encoding-in-mmap-file.csv" % encoding
save_data(test_file, content, encoding=encoding)
with open(test_file, 'r+b') as f:
with open(test_file, "r+b") as f:
csv_content = f.read()
data = get_data(csv_content, file_type='csv', encoding=encoding)
eq_(data['csv'], content)
data = get_data(csv_content, file_type="csv", encoding=encoding)
eq_(data["csv"], content)
os.unlink(test_file)
def test_issue_43():
#if not IN_TRAVIS:
# if not IN_TRAVIS:
# raise SkipTest()
p.get_book(url="https://github.com/pyexcel/pyexcel-xls/raw/master/tests/fixtures/file_with_an_empty_sheet.xls"); # flake8: noqa
p.get_book(
url="https://github.com/pyexcel/pyexcel-xls/raw/master/tests/fixtures/file_with_an_empty_sheet.xls"
)
# flake8: noqa
def test_pyexcel_issue_138():
array = [['123_122', '123_1.', '123_1.0']]
save_data('test.csv', array)
data = get_data('test.csv')
expected = [['123_122', '123_1.', '123_1.0']]
eq_(data['test.csv'], expected)
os.unlink('test.csv')
array = [["123_122", "123_1.", "123_1.0"]]
save_data("test.csv", array)
data = get_data("test.csv")
expected = [["123_122", "123_1.", "123_1.0"]]
eq_(data["test.csv"], expected)
os.unlink("test.csv")
def get_fixture(file_name):

View File

@ -11,23 +11,15 @@ from pyexcel_io.writers.tsv import TSVBookWriter
class TestCSVReaders(TestCase):
file_type = 'csv'
file_type = "csv"
reader_class = CSVBookReader
delimiter = ','
delimiter = ","
def setUp(self):
self.test_file = "csv_book." + self.file_type
self.data = [
["1", "2", "3"],
["4", "5", "6"],
["7", "8", "9"]
]
self.expected_data = [
[1, 2, 3],
[4, 5, 6],
[7, 8, 9]
]
with open(self.test_file, 'w') as f:
self.data = [["1", "2", "3"], ["4", "5", "6"], ["7", "8", "9"]]
self.expected_data = [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
with open(self.test_file, "w") as f:
for row in self.data:
f.write(self.delimiter.join(row) + "\n")
@ -39,7 +31,7 @@ class TestCSVReaders(TestCase):
def test_book_reader_from_memory_source(self):
io = manager.get_io(self.file_type)
with open(self.test_file, 'r') as f:
with open(self.test_file, "r") as f:
io.write(f.read())
io.seek(0)
b = self.reader_class()
@ -54,27 +46,19 @@ class TestCSVReaders(TestCase):
class TestTSVReaders(TestCSVReaders):
file_type = "tsv"
reader_class = TSVBookReader
delimiter = '\t'
delimiter = "\t"
class TestReadMultipleSheets(TestCase):
file_type = "csv"
reader_class = CSVBookReader
delimiter = ','
delimiter = ","
def setUp(self):
self.test_file_formatter = "csv_multiple__%s__%s." + self.file_type
self.merged_book_file = "csv_multiple." + self.file_type
self.data = [
["1", "2", "3"],
["4", "5", "6"],
["7", "8", "9"]
]
self.expected_data = [
[1, 2, 3],
[4, 5, 6],
[7, 8, 9]
]
self.data = [["1", "2", "3"], ["4", "5", "6"], ["7", "8", "9"]]
self.expected_data = [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
self.sheets = OrderedDict()
self.sheets.update({"sheet1": self.data})
self.sheets.update({"sheet2": self.data})
@ -86,7 +70,7 @@ class TestReadMultipleSheets(TestCase):
index = 0
for key, value in self.sheets.items():
file_name = self.test_file_formatter % (key, index)
with open(file_name, 'w') as f:
with open(file_name, "w") as f:
for row in value:
f.write(self.delimiter.join(row) + "\n")
index = index + 1
@ -103,8 +87,9 @@ class TestReadMultipleSheets(TestCase):
b = self.reader_class()
b.open(self.merged_book_file)
sheets = b.read_sheet_by_name("sheet1")
self.assertEqual(list(sheets["sheet1"]),
self.expected_sheets["sheet1"])
self.assertEqual(
list(sheets["sheet1"]), self.expected_sheets["sheet1"]
)
@raises(ValueError)
def test_read_one_from_many_by_non_existent_name(self):
@ -116,8 +101,9 @@ class TestReadMultipleSheets(TestCase):
b = self.reader_class()
b.open(self.merged_book_file)
sheets = b.read_sheet_by_index(1)
self.assertEqual(list(sheets["sheet2"]),
self.expected_sheets["sheet2"])
self.assertEqual(
list(sheets["sheet2"]), self.expected_sheets["sheet2"]
)
@raises(IndexError)
def test_read_one_from_many_by_wrong_index(self):
@ -134,31 +120,38 @@ class TestReadMultipleSheets(TestCase):
class TestTSVBookReaders(TestReadMultipleSheets):
file_type = 'tsv'
file_type = "tsv"
reader_class = TSVBookReader
delimiter = '\t'
delimiter = "\t"
class TestWriteMultipleSheets(TestCase):
file_type = "csv"
writer_class = CSVBookWriter
reader_class = CSVBookReader
result1 = dedent("""
result1 = dedent(
"""
1,2,3
4,5,6
7,8,9
""").strip('\n')
result2 = dedent("""
"""
).strip("\n")
result2 = dedent(
"""
1,2,3
4,5,6
7,8,1000
""").strip('\n')
result3 = dedent("""
"""
).strip("\n")
result3 = dedent(
"""
1,2,3
4,5,6888
7,8,9
""").strip('\n')
merged = dedent("""\
"""
).strip("\n")
merged = dedent(
"""\
---pyexcel:sheet1---
1,2,3
4,5,6
@ -174,41 +167,18 @@ class TestWriteMultipleSheets(TestCase):
4,5,6888
7,8,9
---pyexcel---
""")
"""
)
def setUp(self):
self.test_file_formatter = "csv_multiple__%s__%s." + self.file_type
self.merged_book_file = "csv_multiple." + self.file_type
self.data1 = [
["1", "2", "3"],
["4", "5", "6"],
["7", "8", "9"]
]
self.data2 = [
["1", "2", "3"],
["4", "5", "6"],
["7", "8", "1000"]
]
self.data3 = [
["1", "2", "3"],
["4", "5", "6888"],
["7", "8", "9"]
]
self.expected_data1 = [
[1, 2, 3],
[4, 5, 6],
[7, 8, 9]
]
self.expected_data2 = [
[1, 2, 3],
[4, 5, 6],
[7, 8, 1000]
]
self.expected_data3 = [
[1, 2, 3],
[4, 5, 6888],
[7, 8, 9]
]
self.data1 = [["1", "2", "3"], ["4", "5", "6"], ["7", "8", "9"]]
self.data2 = [["1", "2", "3"], ["4", "5", "6"], ["7", "8", "1000"]]
self.data3 = [["1", "2", "3"], ["4", "5", "6888"], ["7", "8", "9"]]
self.expected_data1 = [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
self.expected_data2 = [[1, 2, 3], [4, 5, 6], [7, 8, 1000]]
self.expected_data3 = [[1, 2, 3], [4, 5, 6888], [7, 8, 9]]
self.sheets = OrderedDict()
self.sheets.update({"sheet1": self.data1})
self.sheets.update({"sheet2": self.data2})
@ -230,16 +200,16 @@ class TestWriteMultipleSheets(TestCase):
index = 0
for key, value in self.sheets.items():
file_name = self.test_file_formatter % (key, index)
with open(file_name, 'r') as f:
content = f.read().replace('\r', '')
assert content.strip('\n') == self.result_dict[key]
with open(file_name, "r") as f:
content = f.read().replace("\r", "")
assert content.strip("\n") == self.result_dict[key]
index = index + 1
self.delete_files()
def test_multiple_sheet_into_memory(self):
io = manager.get_io(self.file_type)
w = self.writer_class()
w.open(io, lineterminator='\n')
w.open(io, lineterminator="\n")
w.write(self.sheets)
w.close()
content = io.getvalue()
@ -249,11 +219,11 @@ class TestWriteMultipleSheets(TestCase):
"""Write csv book into a single stream"""
io = manager.get_io(self.file_type)
w = self.writer_class()
w.open(io, lineterminator='\n')
w.open(io, lineterminator="\n")
w.write(self.sheets)
w.close()
reader = self.reader_class()
reader.open_stream(io, lineterminator='\n', multiple_sheets=True)
reader.open_stream(io, lineterminator="\n", multiple_sheets=True)
sheets = reader.read_all()
for sheet in sheets:
sheets[sheet] = list(sheets[sheet])
@ -268,25 +238,32 @@ class TestWriteMultipleSheets(TestCase):
class TestTSVWriteMultipleSheets(TestWriteMultipleSheets):
file_type = 'tsv'
file_type = "tsv"
writer_class = TSVBookWriter
reader_class = TSVBookReader
result1 = dedent("""
result1 = dedent(
"""
1\t2\t3
4\t5\t6
7\t8\t9
""").strip('\n')
result2 = dedent("""
"""
).strip("\n")
result2 = dedent(
"""
1\t2\t3
4\t5\t6
7\t8\t1000
""").strip('\n')
result3 = dedent("""
"""
).strip("\n")
result3 = dedent(
"""
1\t2\t3
4\t5\t6888
7\t8\t9
""").strip('\n')
merged = dedent("""\
"""
).strip("\n")
merged = dedent(
"""\
---pyexcel:sheet1---
1\t2\t3
4\t5\t6
@ -302,65 +279,64 @@ class TestTSVWriteMultipleSheets(TestWriteMultipleSheets):
4\t5\t6888
7\t8\t9
---pyexcel---
""")
"""
)
class TestWriter(TestCase):
file_type = 'csv'
file_type = "csv"
writer_class = CSVBookWriter
result = dedent("""
result = dedent(
"""
1,2,3
4,5,6
7,8,9
""").strip('\n')
"""
).strip("\n")
def setUp(self):
self.test_file = "csv_book." + self.file_type
self.data = [
["1", "2", "3"],
["4", "5", "6"],
["7", "8", "9"]
]
self.data = [["1", "2", "3"], ["4", "5", "6"], ["7", "8", "9"]]
def test_book_writer(self):
w = self.writer_class()
w.open(self.test_file)
w.write({None: self.data})
w.close()
with open(self.test_file, 'r') as f:
content = f.read().replace('\r', '')
self.assertEqual(content.strip('\n'), self.result)
with open(self.test_file, "r") as f:
content = f.read().replace("\r", "")
self.assertEqual(content.strip("\n"), self.result)
def tearDown(self):
os.unlink(self.test_file)
class TestTSVWriters(TestWriter):
file_type = 'tsv'
file_type = "tsv"
writer_class = TSVBookWriter
result = dedent("""
result = dedent(
"""
1\t2\t3
4\t5\t6
7\t8\t9
""").strip('\n')
"""
).strip("\n")
class TestMemoryWriter(TestCase):
file_type = "csv"
writer_class = CSVBookWriter
result = dedent("""
result = dedent(
"""
1,2,3
4,5,6
7,8,9
""").strip('\n')
"""
).strip("\n")
def setUp(self):
self.test_file = "csv_book." + self.file_type
self.data = [
["1", "2", "3"],
["4", "5", "6"],
["7", "8", "9"]
]
self.data = [["1", "2", "3"], ["4", "5", "6"], ["7", "8", "9"]]
def test_book_writer_to_memroy(self):
io = manager.get_io(self.file_type)
@ -368,15 +344,17 @@ class TestMemoryWriter(TestCase):
w.open(io, single_sheet_in_book=True)
w.write({self.file_type: self.data})
w.close()
content = io.getvalue().replace('\r', '')
assert content.strip('\n') == self.result
content = io.getvalue().replace("\r", "")
assert content.strip("\n") == self.result
class TestTSVMemoryWriter(TestMemoryWriter):
file_type = 'tsv'
file_type = "tsv"
writer_class = TSVBookWriter
result = dedent("""
result = dedent(
"""
1\t2\t3
4\t5\t6
7\t8\t9
""").strip('\n')
"""
).strip("\n")

View File

@ -11,11 +11,12 @@ from pyexcel_io.writers.tsvz import TSVZipBookWriter
import zipfile
from nose.tools import raises
import sys
PY2 = sys.version_info[0] == 2
class TestCSVZ(TestCase):
file_type = 'csvz'
file_type = "csvz"
writer_class = CSVZipBookWriter
reader_class = CSVZipBookReader
result = u"中,文,1,2,3"
@ -24,23 +25,21 @@ class TestCSVZ(TestCase):
self.file = "csvz." + self.file_type
def test_writing(self):
data = [[u'', u'', 1, 2, 3]]
file_name = 'pyexcel_sheet1.' + self.file_type[0:3]
data = [[u"", u"", 1, 2, 3]]
file_name = "pyexcel_sheet1." + self.file_type[0:3]
zipbook = self.writer_class()
zipbook.open(self.file)
zipbook.write({None: data})
zipbook.close()
zip = zipfile.ZipFile(self.file, 'r')
zip = zipfile.ZipFile(self.file, "r")
self.assertEqual(zip.namelist(), [file_name])
content = zip.read(file_name)
content = content.decode('utf-8')
self.assertEqual(
content.replace('\r', '').strip('\n'),
self.result)
content = content.decode("utf-8")
self.assertEqual(content.replace("\r", "").strip("\n"), self.result)
zip.close()
def test_reading(self):
data = [[u'', u'', 1, 2, 3]]
data = [[u"", u"", 1, 2, 3]]
zipbook = self.writer_class()
zipbook.open(self.file)
zipbook.write({None: data})
@ -48,8 +47,7 @@ class TestCSVZ(TestCase):
zipreader = self.reader_class()
zipreader.open(self.file)
data = zipreader.read_all()
self.assertEqual(
list(data['pyexcel_sheet1']), [[u'', u'', 1, 2, 3]])
self.assertEqual(list(data["pyexcel_sheet1"]), [[u"", u"", 1, 2, 3]])
zipreader.close()
def tearDown(self):
@ -57,7 +55,7 @@ class TestCSVZ(TestCase):
class TestTSVZ(TestCSVZ):
file_type = 'tsvz'
file_type = "tsvz"
writer_class = TSVZipBookWriter
reader_class = TSVZipBookReader
result = u"\t\t1\t2\t3"
@ -73,7 +71,7 @@ def test_reading_from_memory():
zipreader = CSVZipBookReader()
zipreader.open_stream(io)
data = zipreader.read_all()
assert list(data['pyexcel_sheet1']) == [[1, 2, 3]]
assert list(data["pyexcel_sheet1"]) == [[1, 2, 3]]
def test_reading_from_memory_tsvz():
@ -86,7 +84,7 @@ def test_reading_from_memory_tsvz():
zipreader = TSVZipBookReader()
zipreader.open_stream(io)
data = zipreader.read_all()
assert list(data['pyexcel_sheet1']) == [[1, 2, 3]]
assert list(data["pyexcel_sheet1"]) == [[1, 2, 3]]
class TestMultipleSheet(TestCase):
@ -95,41 +93,24 @@ class TestMultipleSheet(TestCase):
def setUp(self):
self.content = OrderedDict()
self.content.update({
'Sheet 1':
[
[1.0, 2.0, 3.0],
[4.0, 5.0, 6.0],
[7.0, 8.0, 9.0]
]
})
self.content.update({
'Sheet 2':
[
['X', 'Y', 'Z'],
[1.0, 2.0, 3.0],
[4.0, 5.0, 6.0]
]
})
self.content.update({
'Sheet 3':
[
['O', 'P', 'Q'],
[3.0, 2.0, 1.0],
[4.0, 3.0, 2.0]
]
})
self.content.update(
{"Sheet 1": [[1.0, 2.0, 3.0], [4.0, 5.0, 6.0], [7.0, 8.0, 9.0]]}
)
self.content.update(
{"Sheet 2": [["X", "Y", "Z"], [1.0, 2.0, 3.0], [4.0, 5.0, 6.0]]}
)
self.content.update(
{"Sheet 3": [["O", "P", "Q"], [3.0, 2.0, 1.0], [4.0, 3.0, 2.0]]}
)
save_data(self.file_name, self.content)
def test_read_one_from_many_by_name(self):
reader = self.reader_class()
reader.open(self.file_name)
sheets = reader.read_sheet_by_name("Sheet 1")
self.assertEqual(list(sheets['Sheet 1']), [
[1, 2, 3],
[4, 5, 6],
[7, 8, 9]
])
self.assertEqual(
list(sheets["Sheet 1"]), [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
)
@raises(ValueError)
def test_read_one_from_many_by_unknown_name(self):
@ -141,11 +122,9 @@ class TestMultipleSheet(TestCase):
reader = self.reader_class()
reader.open(self.file_name)
sheets = reader.read_sheet_by_index(0)
self.assertEqual(list(sheets['Sheet 1']), [
[1, 2, 3],
[4, 5, 6],
[7, 8, 9]
])
self.assertEqual(
list(sheets["Sheet 1"]), [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
)
@raises(IndexError)
def test_read_one_from_many_by_unknown_index(self):

View File

@ -10,12 +10,14 @@ from pyexcel_io._compact import text_type
class TestDateFormat(TestCase):
def setUp(self):
self.excel_filename = "testdateformat.csv"
self.data = [[
'01/06/2016',
datetime.date(2014, 12, 25),
datetime.datetime(2014, 12, 25, 11, 11, 11),
datetime.datetime(2014, 12, 25, 11, 11, 11, 10)
]]
self.data = [
[
"01/06/2016",
datetime.date(2014, 12, 25),
datetime.datetime(2014, 12, 25, 11, 11, 11),
datetime.datetime(2014, 12, 25, 11, 11, 11, 10),
]
]
pe.save_as(dest_file_name=self.excel_filename, array=self.data)
def test_auto_detect_float(self):
@ -23,13 +25,17 @@ class TestDateFormat(TestCase):
self.assertEqual(sheet.to_array(), self.data)
def test_auto_detect_float_false(self):
expected = [[
'01/06/2016',
'2014-12-25',
'2014-12-25 11:11:11',
'2014-12-25 11:11:11.000010']]
sheet = pe.get_sheet(file_name=self.excel_filename,
auto_detect_datetime=False)
expected = [
[
"01/06/2016",
"2014-12-25",
"2014-12-25 11:11:11",
"2014-12-25 11:11:11.000010",
]
]
sheet = pe.get_sheet(
file_name=self.excel_filename, auto_detect_datetime=False
)
self.assertEqual(sheet.to_array(), expected)
def tearDown(self):
@ -44,38 +50,46 @@ class TestAutoDetectInt(TestCase):
def test_auto_detect_int(self):
sheet = pe.get_sheet(file_name=self.test_file)
expected = dedent("""
expected = dedent(
"""
test_auto_detect_init.csv:
+---+---+-----+
| 1 | 2 | 3.1 |
+---+---+-----+""").strip()
+---+---+-----+"""
).strip()
self.assertEqual(str(sheet), expected)
def test_get_book_auto_detect_int(self):
book = pe.get_book(file_name=self.test_file)
expected = dedent("""
expected = dedent(
"""
test_auto_detect_init.csv:
+---+---+-----+
| 1 | 2 | 3.1 |
+---+---+-----+""").strip()
+---+---+-----+"""
).strip()
self.assertEqual(str(book), expected)
def test_auto_detect_int_false(self):
sheet = pe.get_sheet(file_name=self.test_file, auto_detect_int=False)
expected = dedent("""
expected = dedent(
"""
test_auto_detect_init.csv:
+-----+-----+-----+
| 1.0 | 2.0 | 3.1 |
+-----+-----+-----+""").strip()
+-----+-----+-----+"""
).strip()
self.assertEqual(str(sheet), expected)
def test_get_book_auto_detect_int_false(self):
book = pe.get_book(file_name=self.test_file, auto_detect_int=False)
expected = dedent("""
expected = dedent(
"""
test_auto_detect_init.csv:
+-----+-----+-----+
| 1.0 | 2.0 | 3.1 |
+-----+-----+-----+""").strip()
+-----+-----+-----+"""
).strip()
self.assertEqual(str(book), expected)
def tearDown(self):
@ -96,22 +110,26 @@ class TestAutoDetectFloat(TestCase):
def test_auto_detect_float_false(self):
sheet = pe.get_sheet(file_name=self.test_file, auto_detect_float=False)
self.assertEqual(sheet.to_array(), [[1, '2.0', '3.1']])
expected = dedent("""
self.assertEqual(sheet.to_array(), [[1, "2.0", "3.1"]])
expected = dedent(
"""
test_auto_detect_init.csv:
+---+-----+-----+
| 1 | 2.0 | 3.1 |
+---+-----+-----+""").strip()
+---+-----+-----+"""
).strip()
self.assertEqual(str(sheet), expected)
def test_get_book_auto_detect_float_false(self):
book = pe.get_book(file_name=self.test_file, auto_detect_float=False)
self.assertEqual(book[0].to_array(), [[1, '2.0', '3.1']])
expected = dedent("""
self.assertEqual(book[0].to_array(), [[1, "2.0", "3.1"]])
expected = dedent(
"""
test_auto_detect_init.csv:
+---+-----+-----+
| 1 | 2.0 | 3.1 |
+---+-----+-----+""").strip()
+---+-----+-----+"""
).strip()
self.assertEqual(str(book), expected)
def tearDown(self):
@ -126,24 +144,23 @@ class TestSpecialStrings(TestCase):
"""
def setUp(self):
self.content = [['01', 1, 2.0, 3.1, 'NaN', 'nan']]
self.content = [["01", 1, 2.0, 3.1, "NaN", "nan"]]
self.test_file = "test_auto_detect_init.csv"
pe.save_as(array=self.content, dest_file_name=self.test_file)
def test_auto_detect_float_true(self):
sheet = pe.get_sheet(file_name=self.test_file)
self.assertEqual(sheet.to_array(),
[['01', 1, 2, 3.1, 'NaN', 'nan']])
self.assertEqual(sheet.to_array(), [["01", 1, 2, 3.1, "NaN", "nan"]])
def test_auto_detect_float_false(self):
sheet = pe.get_sheet(file_name=self.test_file, auto_detect_float=False)
self.assertEqual(sheet.to_array(),
[['01', 1, '2.0', '3.1', 'NaN', 'nan']])
self.assertEqual(
sheet.to_array(), [["01", 1, "2.0", "3.1", "NaN", "nan"]]
)
def test_auto_detect_float_ignore_nan_text(self):
sheet = pe.get_sheet(file_name=self.test_file, ignore_nan_text=True)
self.assertEqual(sheet.to_array(),
[['01', 1, 2.0, 3.1, 'NaN', 'nan']])
self.assertEqual(sheet.to_array(), [["01", 1, 2.0, 3.1, "NaN", "nan"]])
def test_auto_detect_float_default_float_nan(self):
sheet = pe.get_sheet(file_name=self.test_file, default_float_nan="nan")

View File

@ -6,19 +6,17 @@ from pyexcel_io import get_data, save_data
class TestRenderer:
def setUp(self):
self.test_file = "test_filter.csv"
sample = [
[1, 21, 31],
[2, 22, 32]
]
sample = [[1, 21, 31], [2, 22, 32]]
save_data(self.test_file, sample)
def test_filter_row(self):
def custom_row_renderer(row):
return [str(element) for element in row]
custom_data = get_data(self.test_file,
row_renderer=custom_row_renderer)
expected = [['1', '21', '31'], ['2', '22', '32']]
custom_data = get_data(
self.test_file, row_renderer=custom_row_renderer
)
expected = [["1", "21", "31"], ["2", "22", "32"]]
eq_(custom_data[self.test_file], expected)
def tearDown(self):

View File

@ -24,7 +24,7 @@ def test_date_util_parse():
def test_issue_8_1():
# https://github.com/pyexcel/pyexcel-ods3/issues/8
result = time_value('PT1111')
result = time_value("PT1111")
eq_(result, None)
@ -50,42 +50,42 @@ def test_fake_date_time_20():
def test_issue_1_error():
result = time_value('PT1111')
result = time_value("PT1111")
eq_(result, None)
def test_detect_int_value():
result = detect_int_value('123')
result = detect_int_value("123")
eq_(result, 123)
def test_detect_float_value():
result = detect_float_value('123.1')
result = detect_float_value("123.1")
eq_(result, 123.1)
def test_suppression_of_pep_0515_int():
result = detect_int_value('123_123')
result = detect_int_value("123_123")
eq_(result, None)
def test_suppression_of_pep_0515_float():
result = detect_float_value('123_123.')
result = detect_float_value("123_123.")
eq_(result, None)
result = detect_float_value('123_123.1')
result = detect_float_value("123_123.1")
eq_(result, None)
def test_detect_float_value_on_nan():
result = detect_float_value('NaN', ignore_nan_text=True)
result = detect_float_value("NaN", ignore_nan_text=True)
eq_(result, None)
def test_detect_float_value_on_custom_nan_text():
result = detect_float_value('NaN', default_float_nan="nan")
result = detect_float_value("NaN", default_float_nan="nan")
eq_(result, None)
def test_detect_float_value_on_custom_nan_text2():
result = detect_float_value('nan', default_float_nan="nan")
result = detect_float_value("nan", default_float_nan="nan")
eq_(str(result), "nan")

View File

@ -4,13 +4,11 @@ import pyexcel_io.constants as constants
class MyWriter(SheetWriter):
def set_size(self, size):
self.native_book = size
class MyReader(SheetReader):
def number_of_rows(self):
return len(self._native_sheet)
@ -39,10 +37,7 @@ def take_second_column(current_index, start, limit=-1):
def test_custom_skip_row_func():
take_second_row = take_second_column
array = [
[1, 2, 3],
[4, 5, 6]
]
array = [[1, 2, 3], [4, 5, 6]]
reader = MyReader(array, skip_row_func=take_second_row)
actual = list(reader.to_array())
expected = [[4, 5, 6]]
@ -51,15 +46,9 @@ def test_custom_skip_row_func():
def test_custom_skip_column_func():
array = [
[1, 2, 3],
[4, 5, 6]
]
array = [[1, 2, 3], [4, 5, 6]]
reader = MyReader(array, skip_column_func=take_second_column)
actual = list(reader.to_array())
expected = [
[2],
[5]
]
expected = [[2], [5]]
eq_(expected, actual)
reader.close()

View File

@ -11,14 +11,17 @@ from pyexcel_io.database.common import (
SQLTableExporter,
SQLTableExportAdapter,
SQLTableImporter,
SQLTableImportAdapter)
SQLTableImportAdapter,
)
from pyexcel_io.database.exporters.sqlalchemy import (
SQLTableReader,
SQLBookReader)
SQLBookReader,
)
from pyexcel_io.database.importers.sqlalchemy import (
PyexcelSQLSkipRowException,
SQLTableWriter,
SQLBookWriter)
SQLBookWriter,
)
from pyexcel_io.database.querysets import QuerysetsReader
from sqlalchemy.orm import relationship, backref
from nose.tools import raises, eq_
@ -30,7 +33,7 @@ PY36 = PY3 and sys.version_info[1] == 6
engine = None
if platform.python_implementation() == 'PyPy':
if platform.python_implementation() == "PyPy":
engine = create_engine("sqlite:///tmp.db")
else:
engine = create_engine("sqlite://")
@ -39,7 +42,7 @@ Base = declarative_base()
class Pyexcel(Base):
__tablename__ = 'pyexcel'
__tablename__ = "pyexcel"
id = Column(Integer, primary_key=True)
name = Column(String, unique=True)
weight = Column(Float)
@ -47,15 +50,16 @@ class Pyexcel(Base):
class Post(Base):
__tablename__ = 'post'
__tablename__ = "post"
id = Column(Integer, primary_key=True)
title = Column(String(80))
body = Column(String(100))
pub_date = Column(DateTime)
category_id = Column(Integer, ForeignKey('category.id'))
category = relationship('Category',
backref=backref('posts', lazy='dynamic'))
category_id = Column(Integer, ForeignKey("category.id"))
category = relationship(
"Category", backref=backref("posts", lazy="dynamic")
)
def __init__(self, title, body, category, pub_date=None):
self.title = title
@ -66,11 +70,11 @@ class Post(Base):
self.category = category
def __repr__(self):
return '<Post %r>' % self.title
return "<Post %r>" % self.title
class Category(Base):
__tablename__ = 'category'
__tablename__ = "category"
id = Column(Integer, primary_key=True)
name = Column(String(50))
@ -78,7 +82,7 @@ class Category(Base):
self.name = name
def __repr__(self):
return '<Category %r>' % self.name
return "<Category %r>" % self.name
def __str__(self):
return self.__repr__()
@ -91,16 +95,14 @@ class TestSingleRead:
def setUp(self):
Base.metadata.drop_all(engine)
Base.metadata.create_all(engine)
p1 = Pyexcel(id=0,
name="Adam",
weight=11.25,
birth=datetime.date(2014, 11, 11))
p1 = Pyexcel(
id=0, name="Adam", weight=11.25, birth=datetime.date(2014, 11, 11)
)
self.session = Session()
self.session.add(p1)
p1 = Pyexcel(id=1,
name="Smith",
weight=12.25,
birth=datetime.date(2014, 11, 12))
p1 = Pyexcel(
id=1, name="Smith", weight=12.25, birth=datetime.date(2014, 11, 12)
)
self.session.add(p1)
self.session.commit()
self.session.close()
@ -110,9 +112,9 @@ class TestSingleRead:
sheet = SQLTableReader(mysession, Pyexcel)
data = sheet.to_array()
content = [
['birth', 'id', 'name', 'weight'],
['2014-11-11', 0, 'Adam', 11.25],
['2014-11-12', 1, 'Smith', 12.25]
["birth", "id", "name", "weight"],
["2014-11-11", 0, "Adam", 11.25],
["2014-11-12", 1, "Smith", 12.25],
]
# 'pyexcel' here is the table name
assert list(data) == content
@ -123,14 +125,16 @@ class TestSingleRead:
def custom_renderer(row):
return [str(element) for element in row]
# the key for this test case
sheet = SQLTableReader(mysession, Pyexcel,
row_renderer=custom_renderer)
sheet = SQLTableReader(
mysession, Pyexcel, row_renderer=custom_renderer
)
data = sheet.to_array()
content = [
['birth', 'id', 'name', 'weight'],
['2014-11-11', '0', 'Adam', '11.25'],
['2014-11-12', '1', 'Smith', '12.25']
["birth", "id", "name", "weight"],
["2014-11-11", "0", "Adam", "11.25"],
["2014-11-12", "1", "Smith", "12.25"],
]
eq_(list(data), content)
mysession.close()
@ -140,8 +144,8 @@ class TestSingleRead:
sheet = SQLTableReader(mysession, Pyexcel, start_row=1)
data = sheet.to_array()
content = [
['2014-11-11', 0, 'Adam', 11.25],
['2014-11-12', 1, 'Smith', 12.25]
["2014-11-11", 0, "Adam", 11.25],
["2014-11-12", 1, "Smith", 12.25],
]
# 'pyexcel'' here is the table name
assert list(data) == content
@ -149,12 +153,9 @@ class TestSingleRead:
def test_sql_filter_1(self):
mysession = Session()
sheet = SQLTableReader(mysession, Pyexcel,
start_row=1, row_limit=1)
sheet = SQLTableReader(mysession, Pyexcel, start_row=1, row_limit=1)
data = sheet.to_array()
content = [
['2014-11-11', 0, 'Adam', 11.25]
]
content = [["2014-11-11", 0, "Adam", 11.25]]
# 'pyexcel'' here is the table name
assert list(data) == content
mysession.close()
@ -164,9 +165,9 @@ class TestSingleRead:
sheet = SQLTableReader(mysession, Pyexcel, start_column=1)
data = sheet.to_array()
content = [
['id', 'name', 'weight'],
[0, 'Adam', 11.25],
[1, 'Smith', 12.25]
["id", "name", "weight"],
[0, "Adam", 11.25],
[1, "Smith", 12.25],
]
# 'pyexcel'' here is the table name
assert list(data) == content
@ -174,14 +175,11 @@ class TestSingleRead:
def test_sql_filter_3(self):
mysession = Session()
sheet = SQLTableReader(mysession, Pyexcel,
start_column=1, column_limit=1)
sheet = SQLTableReader(
mysession, Pyexcel, start_column=1, column_limit=1
)
data = sheet.to_array()
content = [
['id'],
[0],
[1]
]
content = [["id"], [0], [1]]
# 'pyexcel'' here is the table name
assert list(data) == content
mysession.close()
@ -192,14 +190,14 @@ class TestSingleWrite:
Base.metadata.drop_all(engine)
Base.metadata.create_all(engine)
self.data = [
['birth', 'id', 'name', 'weight'],
[datetime.date(2014, 11, 11), 0, 'Adam', 11.25],
[datetime.date(2014, 11, 12), 1, 'Smith', 12.25]
["birth", "id", "name", "weight"],
[datetime.date(2014, 11, 11), 0, "Adam", 11.25],
[datetime.date(2014, 11, 12), 1, "Smith", 12.25],
]
self.results = [
['birth', 'id', 'name', 'weight'],
['2014-11-11', 0, 'Adam', 11.25],
['2014-11-12', 1, 'Smith', 12.25]
["birth", "id", "name", "weight"],
["2014-11-11", 0, "Adam", 11.25],
["2014-11-12", 1, "Smith", 12.25],
]
def test_one_table(self):
@ -230,23 +228,24 @@ class TestSingleWrite:
assert list(results) == self.results
# update data using custom initializer
update_data = [
['birth', 'id', 'name', 'weight'],
[datetime.date(2014, 11, 11), 0, 'Adam_E', 12.25],
[datetime.date(2014, 11, 12), 1, 'Smith_E', 11.25]
["birth", "id", "name", "weight"],
[datetime.date(2014, 11, 11), 0, "Adam_E", 12.25],
[datetime.date(2014, 11, 12), 1, "Smith_E", 11.25],
]
updated_results = [
['birth', 'id', 'name', 'weight'],
['2014-11-11', 0, 'Adam_E', 12.25],
['2014-11-12', 1, 'Smith_E', 11.25]
["birth", "id", "name", "weight"],
["2014-11-11", 0, "Adam_E", 12.25],
["2014-11-12", 1, "Smith_E", 11.25],
]
def row_updater(row):
an_instance = mysession.query(Pyexcel).get(row['id'])
an_instance = mysession.query(Pyexcel).get(row["id"])
if an_instance is None:
an_instance = Pyexcel()
for name in row.keys():
setattr(an_instance, name, row[name])
return an_instance
importer = SQLTableImporter(mysession)
adapter = SQLTableImportAdapter(Pyexcel)
adapter.column_names = update_data[0]
@ -273,15 +272,16 @@ class TestSingleWrite:
assert list(results) == self.results
# update data using custom initializer
update_data = [
['birth', 'id', 'name', 'weight'],
[datetime.date(2014, 11, 11), 0, 'Adam_E', 12.25],
[datetime.date(2014, 11, 12), 1, 'Smith_E', 11.25]
["birth", "id", "name", "weight"],
[datetime.date(2014, 11, 11), 0, "Adam_E", 12.25],
[datetime.date(2014, 11, 12), 1, "Smith_E", 11.25],
]
def row_updater(row):
an_instance = mysession.query(Pyexcel).get(row['id'])
an_instance = mysession.query(Pyexcel).get(row["id"])
if an_instance is not None:
raise PyexcelSQLSkipRowException()
importer = SQLTableImporter(mysession)
adapter = SQLTableImportAdapter(Pyexcel)
adapter.column_names = update_data[0]
@ -297,10 +297,10 @@ class TestSingleWrite:
def test_one_table_with_empty_rows(self):
mysession = Session()
data = [
['birth', 'id', 'name', 'weight'],
['', '', ''],
[datetime.date(2014, 11, 11), 0, 'Adam', 11.25],
[datetime.date(2014, 11, 12), 1, 'Smith', 12.25]
["birth", "id", "name", "weight"],
["", "", ""],
[datetime.date(2014, 11, 11), 0, "Adam", 11.25],
[datetime.date(2014, 11, 12), 1, "Smith", 12.25],
]
importer = SQLTableImporter(mysession)
adapter = SQLTableImportAdapter(Pyexcel)
@ -316,9 +316,9 @@ class TestSingleWrite:
def test_one_table_with_empty_string_in_unique_field(self):
mysession = Session()
data = [
['birth', 'id', 'name', 'weight'],
[datetime.date(2014, 11, 11), 0, '', 11.25],
[datetime.date(2014, 11, 12), 1, '', 12.25]
["birth", "id", "name", "weight"],
[datetime.date(2014, 11, 11), 0, "", 11.25],
[datetime.date(2014, 11, 12), 1, "", 12.25],
]
importer = SQLTableImporter(mysession)
adapter = SQLTableImportAdapter(Pyexcel)
@ -328,19 +328,21 @@ class TestSingleWrite:
writer.close()
query_sets = mysession.query(Pyexcel).all()
results = QuerysetsReader(query_sets, data[0]).to_array()
assert list(results) == [['birth', 'id', 'name', 'weight'],
['2014-11-11', 0, None, 11.25],
['2014-11-12', 1, None, 12.25]]
assert list(results) == [
["birth", "id", "name", "weight"],
["2014-11-11", 0, None, 11.25],
["2014-11-12", 1, None, 12.25],
]
mysession.close()
def test_one_table_using_mapdict_as_array(self):
mysession = Session()
self.data = [
["Birth Date", "Id", "Name", "Weight"],
[datetime.date(2014, 11, 11), 0, 'Adam', 11.25],
[datetime.date(2014, 11, 12), 1, 'Smith', 12.25]
[datetime.date(2014, 11, 11), 0, "Adam", 11.25],
[datetime.date(2014, 11, 12), 1, "Smith", 12.25],
]
mapdict = ['birth', 'id', 'name', 'weight']
mapdict = ["birth", "id", "name", "weight"]
importer = SQLTableImporter(mysession)
adapter = SQLTableImportAdapter(Pyexcel)
@ -358,14 +360,14 @@ class TestSingleWrite:
mysession = Session()
self.data = [
["Birth Date", "Id", "Name", "Weight"],
[datetime.date(2014, 11, 11), 0, 'Adam', 11.25],
[datetime.date(2014, 11, 12), 1, 'Smith', 12.25]
[datetime.date(2014, 11, 11), 0, "Adam", 11.25],
[datetime.date(2014, 11, 12), 1, "Smith", 12.25],
]
mapdict = {
"Birth Date": 'birth',
"Id": 'id',
"Name": 'name',
"Weight": 'weight'
"Birth Date": "birth",
"Id": "id",
"Name": "name",
"Weight": "weight",
}
importer = SQLTableImporter(mysession)
@ -377,8 +379,8 @@ class TestSingleWrite:
writer.close()
query_sets = mysession.query(Pyexcel).all()
results = QuerysetsReader(
query_sets,
['birth', 'id', 'name', 'weight']).to_array()
query_sets, ["birth", "id", "name", "weight"]
).to_array()
assert list(results) == self.results
mysession.close()
@ -389,44 +391,54 @@ class TestMultipleRead:
Base.metadata.create_all(engine)
self.session = Session()
data = {
"Category": [
["id", "name"],
[1, "News"],
[2, "Sports"]
],
"Category": [["id", "name"], [1, "News"], [2, "Sports"]],
"Post": [
["id", "title", "body", "pub_date", "category"],
[1, "Title A", "formal",
datetime.datetime(2015, 1, 20, 23, 28, 29), "News"],
[2, "Title B", "informal",
datetime.datetime(2015, 1, 20, 23, 28, 30), "Sports"]
]
[
1,
"Title A",
"formal",
datetime.datetime(2015, 1, 20, 23, 28, 29),
"News",
],
[
2,
"Title B",
"informal",
datetime.datetime(2015, 1, 20, 23, 28, 30),
"Sports",
],
],
}
def category_init_func(row):
c = Category(row['name'])
c.id = row['id']
c = Category(row["name"])
c.id = row["id"]
return c
def post_init_func(row):
c = self.session.query(Category).filter_by(
name=row['category']).first()
p = Post(row['title'], row['body'], c, row['pub_date'])
c = (
self.session.query(Category)
.filter_by(name=row["category"])
.first()
)
p = Post(row["title"], row["body"], c, row["pub_date"])
return p
importer = SQLTableImporter(self.session)
category_adapter = SQLTableImportAdapter(Category)
category_adapter.column_names = data['Category'][0]
category_adapter.column_names = data["Category"][0]
category_adapter.row_initializer = category_init_func
importer.append(category_adapter)
post_adapter = SQLTableImportAdapter(Post)
post_adapter.column_names = data['Post'][0]
post_adapter.column_names = data["Post"][0]
post_adapter.row_initializer = post_init_func
importer.append(post_adapter)
writer = SQLBookWriter()
writer.open_content(importer)
to_store = OrderedDict()
to_store.update({category_adapter.get_name(): data['Category'][1:]})
to_store.update({post_adapter.get_name(): data['Post'][1:]})
to_store.update({category_adapter.get_name(): data["Category"][1:]})
to_store.update({post_adapter.get_name(): data["Post"][1:]})
writer.write(to_store)
writer.close()
@ -442,18 +454,21 @@ class TestMultipleRead:
for key in data.keys():
data[key] = list(data[key])
assert json.dumps(data) == (
'{"category": [["id", "name"], [1, "News"], [2, "Sports"]], ' +
'"post": [["body", "category_id", "id", "pub_date", "title"], ' +
'["formal", 1, 1, "2015-01-20T23:28:29", "Title A"], ' +
'["informal", 2, 2, "2015-01-20T23:28:30", "Title B"]]}')
'{"category": [["id", "name"], [1, "News"], [2, "Sports"]], '
+ '"post": [["body", "category_id", "id", "pub_date", "title"], '
+ '["formal", 1, 1, "2015-01-20T23:28:29", "Title A"], '
+ '["informal", 2, 2, "2015-01-20T23:28:30", "Title B"]]}'
)
def test_foreign_key(self):
all_posts = self.session.query(Post).all()
column_names = ['category__name', 'title']
column_names = ["category__name", "title"]
data = list(QuerysetsReader(all_posts, column_names).to_array())
eq_(json.dumps(data),
'[["category__name", "title"], ["News", "Title A"],' +
' ["Sports", "Title B"]]')
eq_(
json.dumps(data),
'[["category__name", "title"], ["News", "Title A"],'
+ ' ["Sports", "Title B"]]',
)
def tearDown(self):
self.session.close()
@ -479,14 +494,14 @@ class TestNoAutoCommit:
Base.metadata.drop_all(engine)
Base.metadata.create_all(engine)
self.data = [
['birth', 'id', 'name', 'weight'],
[datetime.date(2014, 11, 11), 0, 'Adam', 11.25],
[datetime.date(2014, 11, 12), 1, 'Smith', 12.25]
["birth", "id", "name", "weight"],
[datetime.date(2014, 11, 11), 0, "Adam", 11.25],
[datetime.date(2014, 11, 12), 1, "Smith", 12.25],
]
self.results = [
['birth', 'id', 'name', 'weight'],
['2014-11-11', 0, 'Adam', 11.25],
['2014-11-12', 1, 'Smith', 12.25]
["birth", "id", "name", "weight"],
["2014-11-11", 0, "Adam", 11.25],
["2014-11-12", 1, "Smith", 12.25],
]
def test_one_table(self):
@ -498,8 +513,7 @@ class TestNoAutoCommit:
importer = SQLTableImporter(mysession)
adapter = SQLTableImportAdapter(Pyexcel)
adapter.column_names = self.data[0]
writer = SQLTableWriter(importer, adapter,
auto_commit=False)
writer = SQLTableWriter(importer, adapter, auto_commit=False)
writer.write_array(self.data[1:])
writer.close()
mysession.close()
@ -523,7 +537,7 @@ def test_not_implemented_method_2():
def test_sql_table_import_adapter():
adapter = SQLTableImportAdapter(Pyexcel)
adapter.column_names = ['a']
adapter.column_names = ["a"]
adapter.row_initializer = "abc"
eq_(adapter.row_initializer, "abc")
@ -532,10 +546,10 @@ def test_sql_table_import_adapter():
def test_unknown_sheet(self):
importer = SQLTableImporter(None)
category_adapter = SQLTableImportAdapter(Category)
category_adapter.column_names = ['']
category_adapter.column_names = [""]
importer.append(category_adapter)
writer = SQLBookWriter()
writer.open_content(importer)
to_store = OrderedDict()
to_store.update({'you do not see me': [[]]})
to_store.update({"you do not see me": [[]]})
writer.write(to_store)