996 lines
37 KiB
Python
996 lines
37 KiB
Python
from __future__ import unicode_literals
|
|
|
|
import tablib
|
|
from copy import deepcopy
|
|
from datetime import date
|
|
from decimal import Decimal
|
|
from unittest import skip, skipUnless
|
|
|
|
from django import VERSION
|
|
from django.conf import settings
|
|
from django.contrib.auth.models import User
|
|
from django.db import IntegrityError, DatabaseError
|
|
from django.db.models import Count
|
|
from django.db.models.fields import FieldDoesNotExist
|
|
from django.test import TestCase, TransactionTestCase, skipUnlessDBFeature
|
|
from django.utils.html import strip_tags
|
|
from django.utils.encoding import force_text
|
|
|
|
from import_export import fields, resources, results, widgets
|
|
from import_export.instance_loaders import ModelInstanceLoader
|
|
from import_export.resources import Diff
|
|
|
|
from ..models import (
|
|
Author, Book, Category, Entry, Profile, WithDefault, WithDynamicDefault,
|
|
WithFloatField, Person, Role
|
|
)
|
|
|
|
try:
|
|
from collections import OrderedDict
|
|
except ImportError:
|
|
from django.utils.datastructures import SortedDict as OrderedDict
|
|
|
|
|
|
class MyResource(resources.Resource):
|
|
name = fields.Field()
|
|
email = fields.Field()
|
|
extra = fields.Field()
|
|
|
|
class Meta:
|
|
export_order = ('email', 'name')
|
|
|
|
|
|
class ResourceTestCase(TestCase):
|
|
|
|
def setUp(self):
|
|
self.my_resource = MyResource()
|
|
|
|
def test_fields(self):
|
|
"""Check that fields were determined correctly """
|
|
|
|
# check that our fields were determined
|
|
self.assertIn('name', self.my_resource.fields)
|
|
|
|
# check that resource instance fields attr isn't link to resource cls
|
|
# fields
|
|
self.assertFalse(
|
|
MyResource.fields is self.my_resource.fields
|
|
)
|
|
|
|
# dynamically add new resource field into resource instance
|
|
self.my_resource.fields.update(
|
|
OrderedDict([
|
|
('new_field', fields.Field()),
|
|
])
|
|
)
|
|
|
|
# check that new field in resource instance fields
|
|
self.assertIn(
|
|
'new_field',
|
|
self.my_resource.fields
|
|
)
|
|
|
|
# check that new field not in resource cls fields
|
|
self.assertNotIn(
|
|
'new_field',
|
|
MyResource.fields
|
|
)
|
|
|
|
def test_field_column_name(self):
|
|
field = self.my_resource.fields['name']
|
|
self.assertIn(field.column_name, 'name')
|
|
|
|
def test_meta(self):
|
|
self.assertIsInstance(self.my_resource._meta,
|
|
resources.ResourceOptions)
|
|
|
|
def test_get_export_order(self):
|
|
self.assertEqual(self.my_resource.get_export_headers(),
|
|
['email', 'name', 'extra'])
|
|
|
|
# Issue 140 Attributes aren't inherited by subclasses
|
|
def test_inheritance(self):
|
|
class A(MyResource):
|
|
inherited = fields.Field()
|
|
|
|
class Meta:
|
|
import_id_fields = ('email',)
|
|
|
|
class B(A):
|
|
local = fields.Field()
|
|
|
|
class Meta:
|
|
export_order = ('email', 'extra')
|
|
|
|
resource = B()
|
|
self.assertIn('name', resource.fields)
|
|
self.assertIn('inherited', resource.fields)
|
|
self.assertIn('local', resource.fields)
|
|
self.assertEqual(resource.get_export_headers(),
|
|
['email', 'extra', 'name', 'inherited', 'local'])
|
|
self.assertEqual(resource._meta.import_id_fields, ('email',))
|
|
|
|
def test_inheritance_with_custom_attributes(self):
|
|
class A(MyResource):
|
|
inherited = fields.Field()
|
|
|
|
class Meta:
|
|
import_id_fields = ('email',)
|
|
custom_attribute = True
|
|
|
|
class B(A):
|
|
local = fields.Field()
|
|
|
|
resource = B()
|
|
self.assertEqual(resource._meta.custom_attribute, True)
|
|
|
|
|
|
class AuthorResource(resources.ModelResource):
|
|
|
|
books = fields.Field(
|
|
column_name='books',
|
|
attribute='book_set',
|
|
readonly=True,
|
|
)
|
|
|
|
class Meta:
|
|
model = Author
|
|
export_order = ('name', 'books')
|
|
|
|
|
|
class BookResource(resources.ModelResource):
|
|
published = fields.Field(column_name='published_date')
|
|
|
|
class Meta:
|
|
model = Book
|
|
exclude = ('imported', )
|
|
|
|
|
|
class CategoryResource(resources.ModelResource):
|
|
|
|
class Meta:
|
|
model = Category
|
|
|
|
|
|
class ProfileResource(resources.ModelResource):
|
|
class Meta:
|
|
model = Profile
|
|
exclude = ('user', )
|
|
|
|
|
|
class WithDefaultResource(resources.ModelResource):
|
|
class Meta:
|
|
model = WithDefault
|
|
fields = ('name',)
|
|
|
|
|
|
class ModelResourceTest(TestCase):
|
|
def setUp(self):
|
|
self.resource = BookResource()
|
|
|
|
self.book = Book.objects.create(name="Some book")
|
|
self.dataset = tablib.Dataset(headers=['id', 'name', 'author_email',
|
|
'price'])
|
|
row = [self.book.pk, 'Some book', 'test@example.com', "10.25"]
|
|
self.dataset.append(row)
|
|
|
|
def test_default_instance_loader_class(self):
|
|
self.assertIs(self.resource._meta.instance_loader_class,
|
|
ModelInstanceLoader)
|
|
|
|
def test_fields(self):
|
|
fields = self.resource.fields
|
|
self.assertIn('id', fields)
|
|
self.assertIn('name', fields)
|
|
self.assertIn('author_email', fields)
|
|
self.assertIn('price', fields)
|
|
|
|
def test_fields_foreign_key(self):
|
|
fields = self.resource.fields
|
|
self.assertIn('author', fields)
|
|
widget = fields['author'].widget
|
|
self.assertIsInstance(widget, widgets.ForeignKeyWidget)
|
|
self.assertEqual(widget.model, Author)
|
|
|
|
def test_fields_m2m(self):
|
|
fields = self.resource.fields
|
|
self.assertIn('categories', fields)
|
|
|
|
def test_excluded_fields(self):
|
|
self.assertNotIn('imported', self.resource.fields)
|
|
|
|
def test_init_instance(self):
|
|
instance = self.resource.init_instance()
|
|
self.assertIsInstance(instance, Book)
|
|
|
|
def test_default(self):
|
|
self.assertEquals(WithDefaultResource.fields['name'].clean({'name': ''}), 'foo_bar')
|
|
|
|
def test_get_instance(self):
|
|
instance_loader = self.resource._meta.instance_loader_class(
|
|
self.resource)
|
|
self.resource._meta.import_id_fields = ['id']
|
|
instance = self.resource.get_instance(instance_loader,
|
|
self.dataset.dict[0])
|
|
self.assertEqual(instance, self.book)
|
|
|
|
def test_get_instance_import_id_fields(self):
|
|
|
|
class BookResource(resources.ModelResource):
|
|
name = fields.Field(attribute='name', widget=widgets.CharWidget())
|
|
|
|
class Meta:
|
|
model = Book
|
|
import_id_fields = ['name']
|
|
|
|
resource = BookResource()
|
|
instance_loader = resource._meta.instance_loader_class(resource)
|
|
instance = resource.get_instance(instance_loader, self.dataset.dict[0])
|
|
self.assertEqual(instance, self.book)
|
|
|
|
def test_get_instance_with_missing_field_data(self):
|
|
instance_loader = self.resource._meta.instance_loader_class(
|
|
self.resource)
|
|
# construct a dataset with a missing "id" column
|
|
dataset = tablib.Dataset(headers=['name', 'author_email', 'price'])
|
|
dataset.append(['Some book', 'test@example.com', "10.25"])
|
|
with self.assertRaises(KeyError) as cm:
|
|
self.resource.get_instance(instance_loader, dataset.dict[0])
|
|
self.assertEqual(u"Column 'id' not found in dataset. Available columns "
|
|
"are: %s" % [u'name', u'author_email', u'price'],
|
|
cm.exception.args[0])
|
|
|
|
def test_get_export_headers(self):
|
|
headers = self.resource.get_export_headers()
|
|
self.assertEqual(headers, ['published_date', 'id', 'name', 'author',
|
|
'author_email', 'published_time', 'price',
|
|
'categories', ])
|
|
|
|
def test_export(self):
|
|
dataset = self.resource.export(Book.objects.all())
|
|
self.assertEqual(len(dataset), 1)
|
|
|
|
def test_export_iterable(self):
|
|
dataset = self.resource.export(list(Book.objects.all()))
|
|
self.assertEqual(len(dataset), 1)
|
|
|
|
def test_get_diff(self):
|
|
diff = Diff(self.resource, self.book, False)
|
|
book2 = Book(name="Some other book")
|
|
diff.compare_with(self.resource, book2)
|
|
html = diff.as_html()
|
|
headers = self.resource.get_export_headers()
|
|
self.assertEqual(html[headers.index('name')],
|
|
u'<span>Some </span><ins style="background:#e6ffe6;">'
|
|
u'other </ins><span>book</span>')
|
|
self.assertFalse(html[headers.index('author_email')])
|
|
|
|
@skip("See: https://github.com/django-import-export/django-import-export/issues/311")
|
|
def test_get_diff_with_callable_related_manager(self):
|
|
resource = AuthorResource()
|
|
author = Author(name="Some author")
|
|
author.save()
|
|
author2 = Author(name="Some author")
|
|
self.book.author = author
|
|
self.book.save()
|
|
diff = Diff(self.resource, author, False)
|
|
diff.compare_with(self.resource, author2)
|
|
html = diff.as_html()
|
|
headers = resource.get_export_headers()
|
|
self.assertEqual(html[headers.index('books')],
|
|
'<span>core.Book.None</span>')
|
|
|
|
def test_import_data(self):
|
|
result = self.resource.import_data(self.dataset, raise_errors=True)
|
|
|
|
self.assertFalse(result.has_errors())
|
|
self.assertEqual(len(result.rows), 1)
|
|
self.assertTrue(result.rows[0].diff)
|
|
self.assertEqual(result.rows[0].import_type,
|
|
results.RowResult.IMPORT_TYPE_UPDATE)
|
|
|
|
instance = Book.objects.get(pk=self.book.pk)
|
|
self.assertEqual(instance.author_email, 'test@example.com')
|
|
self.assertEqual(instance.price, Decimal("10.25"))
|
|
|
|
def test_import_data_value_error_includes_field_name(self):
|
|
class AuthorResource(resources.ModelResource):
|
|
class Meta:
|
|
model = Author
|
|
|
|
resource = AuthorResource()
|
|
dataset = tablib.Dataset(headers=['id', 'name', 'birthday'])
|
|
dataset.append(['', 'A.A.Milne', '1882test-01-18'])
|
|
|
|
result = resource.import_data(dataset, raise_errors=False)
|
|
|
|
self.assertTrue(result.has_errors())
|
|
self.assertTrue(result.rows[0].errors)
|
|
msg = ("Column 'birthday': Enter a valid date/time.")
|
|
actual = result.rows[0].errors[0].error
|
|
self.assertIsInstance(actual, ValueError)
|
|
self.assertEqual(msg, str(actual))
|
|
|
|
def test_import_data_error_saving_model(self):
|
|
row = list(self.dataset.pop())
|
|
# set pk to something that would yield error
|
|
row[0] = 'foo'
|
|
self.dataset.append(row)
|
|
result = self.resource.import_data(self.dataset, raise_errors=False)
|
|
|
|
self.assertTrue(result.has_errors())
|
|
self.assertTrue(result.rows[0].errors)
|
|
actual = result.rows[0].errors[0].error
|
|
self.assertIsInstance(actual, ValueError)
|
|
self.assertIn("Column 'id': could not convert string to float",
|
|
str(actual))
|
|
|
|
def test_import_data_delete(self):
|
|
|
|
class B(BookResource):
|
|
delete = fields.Field(widget=widgets.BooleanWidget())
|
|
|
|
def for_delete(self, row, instance):
|
|
return self.fields['delete'].clean(row)
|
|
|
|
row = [self.book.pk, self.book.name, '1']
|
|
dataset = tablib.Dataset(*[row], headers=['id', 'name', 'delete'])
|
|
result = B().import_data(dataset, raise_errors=True)
|
|
self.assertFalse(result.has_errors())
|
|
self.assertEqual(result.rows[0].import_type,
|
|
results.RowResult.IMPORT_TYPE_DELETE)
|
|
self.assertFalse(Book.objects.filter(pk=self.book.pk))
|
|
|
|
def test_save_instance_with_dry_run_flag(self):
|
|
class B(BookResource):
|
|
def before_save_instance(self, instance, using_transactions, dry_run):
|
|
super(B, self).before_save_instance(instance, using_transactions, dry_run)
|
|
if dry_run:
|
|
self.before_save_instance_dry_run = True
|
|
else:
|
|
self.before_save_instance_dry_run = False
|
|
def save_instance(self, instance, using_transactions=True, dry_run=False):
|
|
super(B, self).save_instance(instance, using_transactions, dry_run)
|
|
if dry_run:
|
|
self.save_instance_dry_run = True
|
|
else:
|
|
self.save_instance_dry_run = False
|
|
def after_save_instance(self, instance, using_transactions, dry_run):
|
|
super(B, self).after_save_instance(instance, using_transactions, dry_run)
|
|
if dry_run:
|
|
self.after_save_instance_dry_run = True
|
|
else:
|
|
self.after_save_instance_dry_run = False
|
|
|
|
resource = B()
|
|
resource.import_data(self.dataset, dry_run=True, raise_errors=True)
|
|
self.assertTrue(resource.before_save_instance_dry_run)
|
|
self.assertTrue(resource.save_instance_dry_run)
|
|
self.assertTrue(resource.after_save_instance_dry_run)
|
|
|
|
resource.import_data(self.dataset, dry_run=False, raise_errors=True)
|
|
self.assertFalse(resource.before_save_instance_dry_run)
|
|
self.assertFalse(resource.save_instance_dry_run)
|
|
self.assertFalse(resource.after_save_instance_dry_run)
|
|
|
|
def test_delete_instance_with_dry_run_flag(self):
|
|
class B(BookResource):
|
|
delete = fields.Field(widget=widgets.BooleanWidget())
|
|
|
|
def for_delete(self, row, instance):
|
|
return self.fields['delete'].clean(row)
|
|
|
|
def before_delete_instance(self, instance, dry_run):
|
|
super(B, self).before_delete_instance(instance, dry_run)
|
|
if dry_run:
|
|
self.before_delete_instance_dry_run = True
|
|
else:
|
|
self.before_delete_instance_dry_run = False
|
|
|
|
def delete_instance(self, instance, using_transactions=True, dry_run=False):
|
|
super(B, self).delete_instance(instance, using_transactions, dry_run)
|
|
if dry_run:
|
|
self.delete_instance_dry_run = True
|
|
else:
|
|
self.delete_instance_dry_run = False
|
|
|
|
def after_delete_instance(self, instance, dry_run):
|
|
super(B, self).after_delete_instance(instance, dry_run)
|
|
if dry_run:
|
|
self.after_delete_instance_dry_run = True
|
|
else:
|
|
self.after_delete_instance_dry_run = False
|
|
|
|
resource = B()
|
|
row = [self.book.pk, self.book.name, '1']
|
|
dataset = tablib.Dataset(*[row], headers=['id', 'name', 'delete'])
|
|
resource.import_data(dataset, dry_run=True, raise_errors=True)
|
|
self.assertTrue(resource.before_delete_instance_dry_run)
|
|
self.assertTrue(resource.delete_instance_dry_run)
|
|
self.assertTrue(resource.after_delete_instance_dry_run)
|
|
|
|
resource.import_data(dataset, dry_run=False, raise_errors=True)
|
|
self.assertFalse(resource.before_delete_instance_dry_run)
|
|
self.assertFalse(resource.delete_instance_dry_run)
|
|
self.assertFalse(resource.after_delete_instance_dry_run)
|
|
|
|
def test_relationships_fields(self):
|
|
|
|
class B(resources.ModelResource):
|
|
class Meta:
|
|
model = Book
|
|
fields = ('author__name',)
|
|
|
|
author = Author.objects.create(name="Author")
|
|
self.book.author = author
|
|
resource = B()
|
|
result = resource.fields['author__name'].export(self.book)
|
|
self.assertEqual(result, author.name)
|
|
|
|
def test_dehydrating_fields(self):
|
|
|
|
class B(resources.ModelResource):
|
|
full_title = fields.Field(column_name="Full title")
|
|
|
|
class Meta:
|
|
model = Book
|
|
fields = ('author__name', 'full_title')
|
|
|
|
def dehydrate_full_title(self, obj):
|
|
return '%s by %s' % (obj.name, obj.author.name)
|
|
|
|
author = Author.objects.create(name="Author")
|
|
self.book.author = author
|
|
resource = B()
|
|
full_title = resource.export_field(resource.get_fields()[0], self.book)
|
|
self.assertEqual(full_title, '%s by %s' % (self.book.name,
|
|
self.book.author.name))
|
|
|
|
def test_widget_fomat_in_fk_field(self):
|
|
class B(resources.ModelResource):
|
|
|
|
class Meta:
|
|
model = Book
|
|
fields = ('author__birthday',)
|
|
widgets = {
|
|
'author__birthday': {'format': '%Y-%m-%d'},
|
|
}
|
|
|
|
author = Author.objects.create(name="Author")
|
|
self.book.author = author
|
|
resource = B()
|
|
result = resource.fields['author__birthday'].export(self.book)
|
|
self.assertEqual(result, str(date.today()))
|
|
|
|
def test_widget_kwargs_for_field(self):
|
|
|
|
class B(resources.ModelResource):
|
|
|
|
class Meta:
|
|
model = Book
|
|
fields = ('published',)
|
|
widgets = {
|
|
'published': {'format': '%d.%m.%Y'},
|
|
}
|
|
|
|
resource = B()
|
|
self.book.published = date(2012, 8, 13)
|
|
result = resource.fields['published'].export(self.book)
|
|
self.assertEqual(result, "13.08.2012")
|
|
|
|
def test_foreign_keys_export(self):
|
|
author1 = Author.objects.create(name='Foo')
|
|
self.book.author = author1
|
|
self.book.save()
|
|
|
|
dataset = self.resource.export(Book.objects.all())
|
|
self.assertEqual(dataset.dict[0]['author'], author1.pk)
|
|
|
|
def test_foreign_keys_import(self):
|
|
author2 = Author.objects.create(name='Bar')
|
|
headers = ['id', 'name', 'author']
|
|
row = [None, 'FooBook', author2.pk]
|
|
dataset = tablib.Dataset(row, headers=headers)
|
|
self.resource.import_data(dataset, raise_errors=True)
|
|
|
|
book = Book.objects.get(name='FooBook')
|
|
self.assertEqual(book.author, author2)
|
|
|
|
def test_m2m_export(self):
|
|
cat1 = Category.objects.create(name='Cat 1')
|
|
cat2 = Category.objects.create(name='Cat 2')
|
|
self.book.categories.add(cat1)
|
|
self.book.categories.add(cat2)
|
|
|
|
dataset = self.resource.export(Book.objects.all())
|
|
self.assertEqual(dataset.dict[0]['categories'],
|
|
'%d,%d' % (cat1.pk, cat2.pk))
|
|
|
|
def test_m2m_import(self):
|
|
cat1 = Category.objects.create(name='Cat 1')
|
|
headers = ['id', 'name', 'categories']
|
|
row = [None, 'FooBook', "%s" % cat1.pk]
|
|
dataset = tablib.Dataset(row, headers=headers)
|
|
self.resource.import_data(dataset, raise_errors=True)
|
|
|
|
book = Book.objects.get(name='FooBook')
|
|
self.assertIn(cat1, book.categories.all())
|
|
|
|
def test_m2m_options_import(self):
|
|
cat1 = Category.objects.create(name='Cat 1')
|
|
cat2 = Category.objects.create(name='Cat 2')
|
|
headers = ['id', 'name', 'categories']
|
|
row = [None, 'FooBook', "Cat 1|Cat 2"]
|
|
dataset = tablib.Dataset(row, headers=headers)
|
|
|
|
class BookM2MResource(resources.ModelResource):
|
|
categories = fields.Field(
|
|
attribute='categories',
|
|
widget=widgets.ManyToManyWidget(Category, field='name',
|
|
separator='|')
|
|
)
|
|
|
|
class Meta:
|
|
model = Book
|
|
|
|
resource = BookM2MResource()
|
|
resource.import_data(dataset, raise_errors=True)
|
|
book = Book.objects.get(name='FooBook')
|
|
self.assertIn(cat1, book.categories.all())
|
|
self.assertIn(cat2, book.categories.all())
|
|
|
|
def test_related_one_to_one(self):
|
|
# issue #17 - Exception when attempting access something on the
|
|
# related_name
|
|
|
|
user = User.objects.create(username='foo')
|
|
profile = Profile.objects.create(user=user)
|
|
Entry.objects.create(user=user)
|
|
Entry.objects.create(user=User.objects.create(username='bar'))
|
|
|
|
class EntryResource(resources.ModelResource):
|
|
class Meta:
|
|
model = Entry
|
|
fields = ('user__profile', 'user__profile__is_private')
|
|
|
|
resource = EntryResource()
|
|
dataset = resource.export(Entry.objects.all())
|
|
self.assertEqual(dataset.dict[0]['user__profile'], profile.pk)
|
|
self.assertEqual(dataset.dict[0]['user__profile__is_private'], '1')
|
|
self.assertEqual(dataset.dict[1]['user__profile'], '')
|
|
self.assertEqual(dataset.dict[1]['user__profile__is_private'], '')
|
|
|
|
def test_empty_get_queryset(self):
|
|
# issue #25 - Overriding queryset on export() fails when passed
|
|
# queryset has zero elements
|
|
dataset = self.resource.export(Book.objects.none())
|
|
self.assertEqual(len(dataset), 0)
|
|
|
|
def test_import_data_skip_unchanged(self):
|
|
def attempted_save(instance, real_dry_run):
|
|
self.fail('Resource attempted to save instead of skipping')
|
|
|
|
# Make sure we test with ManyToMany related objects
|
|
cat1 = Category.objects.create(name='Cat 1')
|
|
cat2 = Category.objects.create(name='Cat 2')
|
|
self.book.categories.add(cat1)
|
|
self.book.categories.add(cat2)
|
|
dataset = self.resource.export()
|
|
|
|
# Create a new resource that attempts to reimport the data currently
|
|
# in the database while skipping unchanged rows (i.e. all of them)
|
|
resource = deepcopy(self.resource)
|
|
resource._meta.skip_unchanged = True
|
|
# Fail the test if the resource attempts to save the row
|
|
resource.save_instance = attempted_save
|
|
result = resource.import_data(dataset, raise_errors=True)
|
|
self.assertFalse(result.has_errors())
|
|
self.assertEqual(len(result.rows), len(dataset))
|
|
self.assertTrue(result.rows[0].diff)
|
|
self.assertEqual(result.rows[0].import_type,
|
|
results.RowResult.IMPORT_TYPE_SKIP)
|
|
|
|
# Test that we can suppress reporting of skipped rows
|
|
resource._meta.report_skipped = False
|
|
result = resource.import_data(dataset, raise_errors=True)
|
|
self.assertFalse(result.has_errors())
|
|
self.assertEqual(len(result.rows), 0)
|
|
|
|
def test_before_import_access_to_kwargs(self):
|
|
class B(BookResource):
|
|
def before_import(self, dataset, using_transactions, dry_run, **kwargs):
|
|
if 'extra_arg' in kwargs:
|
|
dataset.headers[dataset.headers.index('author_email')] = 'old_email'
|
|
dataset.insert_col(0,
|
|
lambda row: kwargs['extra_arg'],
|
|
header='author_email')
|
|
|
|
resource = B()
|
|
result = resource.import_data(self.dataset, raise_errors=True,
|
|
extra_arg='extra@example.com')
|
|
self.assertFalse(result.has_errors())
|
|
self.assertEqual(len(result.rows), 1)
|
|
instance = Book.objects.get(pk=self.book.pk)
|
|
self.assertEqual(instance.author_email, 'extra@example.com')
|
|
|
|
def test_before_import_raises_error(self):
|
|
class B(BookResource):
|
|
def before_import(self, dataset, using_transactions, dry_run, **kwargs):
|
|
raise Exception('This is an invalid dataset')
|
|
|
|
resource = B()
|
|
with self.assertRaises(Exception) as cm:
|
|
resource.import_data(self.dataset, raise_errors=True)
|
|
self.assertEqual(u"This is an invalid dataset", cm.exception.args[0])
|
|
|
|
def test_after_import_raises_error(self):
|
|
class B(BookResource):
|
|
def after_import(self, dataset, result, using_transactions, dry_run, **kwargs):
|
|
raise Exception('This is an invalid dataset')
|
|
|
|
resource = B()
|
|
with self.assertRaises(Exception) as cm:
|
|
resource.import_data(self.dataset, raise_errors=True)
|
|
self.assertEqual(u"This is an invalid dataset", cm.exception.args[0])
|
|
|
|
def test_link_to_nonexistent_field(self):
|
|
with self.assertRaises(FieldDoesNotExist) as cm:
|
|
class BrokenBook1(resources.ModelResource):
|
|
class Meta:
|
|
model = Book
|
|
fields = ('nonexistent__invalid',)
|
|
self.assertEqual("Book.nonexistent: Book has no field named 'nonexistent'",
|
|
cm.exception.args[0])
|
|
|
|
with self.assertRaises(FieldDoesNotExist) as cm:
|
|
class BrokenBook2(resources.ModelResource):
|
|
class Meta:
|
|
model = Book
|
|
fields = ('author__nonexistent',)
|
|
self.assertEqual("Book.author.nonexistent: Author has no field named "
|
|
"'nonexistent'", cm.exception.args[0])
|
|
|
|
def test_link_to_nonrelation_field(self):
|
|
with self.assertRaises(KeyError) as cm:
|
|
class BrokenBook1(resources.ModelResource):
|
|
class Meta:
|
|
model = Book
|
|
fields = ('published__invalid',)
|
|
self.assertEqual("Book.published is not a relation",
|
|
cm.exception.args[0])
|
|
|
|
with self.assertRaises(KeyError) as cm:
|
|
class BrokenBook2(resources.ModelResource):
|
|
class Meta:
|
|
model = Book
|
|
fields = ('author__name__invalid',)
|
|
self.assertEqual("Book.author.name is not a relation",
|
|
cm.exception.args[0])
|
|
|
|
def test_override_field_construction_in_resource(self):
|
|
class B(resources.ModelResource):
|
|
class Meta:
|
|
model = Book
|
|
fields = ('published',)
|
|
|
|
@classmethod
|
|
def field_from_django_field(self, field_name, django_field,
|
|
readonly):
|
|
if field_name == 'published':
|
|
return {'sound': 'quack'}
|
|
|
|
B()
|
|
self.assertEqual({'sound': 'quack'}, B.fields['published'])
|
|
|
|
def test_readonly_annotated_field_import_and_export(self):
|
|
class B(BookResource):
|
|
total_categories = fields.Field('total_categories', readonly=True)
|
|
|
|
class Meta:
|
|
model = Book
|
|
skip_unchanged = True
|
|
|
|
cat1 = Category.objects.create(name='Cat 1')
|
|
self.book.categories.add(cat1)
|
|
|
|
resource = B()
|
|
|
|
# Verify that the annotated field is correctly exported
|
|
dataset = resource.export(
|
|
Book.objects.annotate(total_categories=Count('categories')))
|
|
self.assertEqual(int(dataset.dict[0]['total_categories']), 1)
|
|
|
|
# Verify that importing the annotated field raises no errors and that
|
|
# the rows are skipped
|
|
result = resource.import_data(dataset, raise_errors=True)
|
|
self.assertFalse(result.has_errors())
|
|
self.assertEqual(len(result.rows), len(dataset))
|
|
self.assertEqual(
|
|
result.rows[0].import_type, results.RowResult.IMPORT_TYPE_SKIP)
|
|
|
|
def test_follow_relationship_for_modelresource(self):
|
|
|
|
class EntryResource(resources.ModelResource):
|
|
username = fields.Field(attribute='user__username', readonly=False)
|
|
|
|
class Meta:
|
|
model = Entry
|
|
fields = ('id', )
|
|
|
|
def after_save_instance(self, instance, using_transactions, dry_run):
|
|
if not using_transactions and dry_run:
|
|
# we don't have transactions and we want to do a dry_run
|
|
pass
|
|
else:
|
|
instance.user.save()
|
|
|
|
user = User.objects.create(username='foo')
|
|
entry = Entry.objects.create(user=user)
|
|
row = [
|
|
entry.pk,
|
|
'bar',
|
|
]
|
|
self.dataset = tablib.Dataset(headers=['id', 'username'])
|
|
self.dataset.append(row)
|
|
result = EntryResource().import_data(
|
|
self.dataset, raise_errors=True, dry_run=False)
|
|
self.assertFalse(result.has_errors())
|
|
self.assertEquals(User.objects.get(pk=user.pk).username, 'bar')
|
|
|
|
def test_import_data_dynamic_default_callable(self):
|
|
|
|
class DynamicDefaultResource(resources.ModelResource):
|
|
class Meta:
|
|
model = WithDynamicDefault
|
|
fields = ('id', 'name',)
|
|
|
|
self.assertTrue(callable(DynamicDefaultResource.fields['name'].default))
|
|
|
|
resource = DynamicDefaultResource()
|
|
dataset = tablib.Dataset(headers=['id', 'name', ])
|
|
dataset.append([1, None])
|
|
dataset.append([2, None])
|
|
resource.import_data(dataset, raise_errors=False)
|
|
objs = WithDynamicDefault.objects.all()
|
|
self.assertNotEqual(objs[0].name, objs[1].name)
|
|
|
|
def test_float_field(self):
|
|
#433
|
|
class R(resources.ModelResource):
|
|
class Meta:
|
|
model = WithFloatField
|
|
resource = R()
|
|
dataset = tablib.Dataset(headers=['id', 'f', ])
|
|
dataset.append([None, None])
|
|
dataset.append([None, ''])
|
|
resource.import_data(dataset, raise_errors=True)
|
|
self.assertEqual(WithFloatField.objects.all()[0].f, None)
|
|
self.assertEqual(WithFloatField.objects.all()[1].f, None)
|
|
|
|
|
|
class ModelResourceTransactionTest(TransactionTestCase):
|
|
@skipUnlessDBFeature('supports_transactions')
|
|
def test_m2m_import_with_transactions(self):
|
|
resource = BookResource()
|
|
cat1 = Category.objects.create(name='Cat 1')
|
|
headers = ['id', 'name', 'categories']
|
|
row = [None, 'FooBook', "%s" % cat1.pk]
|
|
dataset = tablib.Dataset(row, headers=headers)
|
|
|
|
result = resource.import_data(
|
|
dataset, dry_run=True, use_transactions=True
|
|
)
|
|
|
|
row_diff = result.rows[0].diff
|
|
fields = resource.get_fields()
|
|
|
|
id_field = resource.fields['id']
|
|
id_diff = row_diff[fields.index(id_field)]
|
|
# id diff should exist because in rollbacked transaction
|
|
# FooBook has been saved
|
|
self.assertTrue(id_diff)
|
|
|
|
category_field = resource.fields['categories']
|
|
categories_diff = row_diff[fields.index(category_field)]
|
|
self.assertEqual(strip_tags(categories_diff), force_text(cat1.pk))
|
|
|
|
# check that it is really rollbacked
|
|
self.assertFalse(Book.objects.filter(name='FooBook'))
|
|
|
|
@skipUnlessDBFeature('supports_transactions')
|
|
def test_m2m_import_with_transactions_error(self):
|
|
resource = ProfileResource()
|
|
headers = ['id', 'user']
|
|
# 'user' is a required field, the database will raise an error.
|
|
row = [None, None]
|
|
dataset = tablib.Dataset(row, headers=headers)
|
|
|
|
result = resource.import_data(
|
|
dataset, dry_run=True, use_transactions=True
|
|
)
|
|
|
|
# Ensure the error raised by the database has been saved.
|
|
self.assertTrue(result.has_errors())
|
|
|
|
# Ensure the rollback has worked properly.
|
|
self.assertEqual(Profile.objects.count(), 0)
|
|
|
|
@skipUnlessDBFeature('supports_transactions')
|
|
def test_integrity_error_rollback_on_savem2m(self):
|
|
# savepoint_rollback() after an IntegrityError gives
|
|
# TransactionManagementError (#399)
|
|
class CategoryResourceRaisesIntegrityError(CategoryResource):
|
|
def save_m2m(self, instance, *args, **kwargs):
|
|
# force raising IntegrityError
|
|
Category.objects.create(name=instance.name)
|
|
|
|
resource = CategoryResourceRaisesIntegrityError()
|
|
headers = ['id', 'name']
|
|
rows = [
|
|
[None, 'foo'],
|
|
]
|
|
dataset = tablib.Dataset(*rows, headers=headers)
|
|
result = resource.import_data(
|
|
dataset,
|
|
use_transactions=True,
|
|
)
|
|
self.assertTrue(result.has_errors())
|
|
|
|
@skipUnlessDBFeature('supports_transactions')
|
|
def test_multiple_database_errors(self):
|
|
|
|
class CategoryResourceDbErrorsResource(CategoryResource):
|
|
|
|
def before_import(self, *args, **kwargs):
|
|
raise DatabaseError()
|
|
|
|
def save_instance(self):
|
|
raise DatabaseError()
|
|
|
|
resource = CategoryResourceDbErrorsResource()
|
|
headers = ['id', 'name']
|
|
rows = [
|
|
[None, 'foo'],
|
|
]
|
|
dataset = tablib.Dataset(*rows, headers=headers)
|
|
result = resource.import_data(
|
|
dataset,
|
|
use_transactions=True,
|
|
)
|
|
self.assertTrue(result.has_errors())
|
|
|
|
|
|
class ModelResourceFactoryTest(TestCase):
|
|
|
|
def test_create(self):
|
|
BookResource = resources.modelresource_factory(Book)
|
|
self.assertIn('id', BookResource.fields)
|
|
self.assertEqual(BookResource._meta.model, Book)
|
|
|
|
|
|
@skipUnless(
|
|
'postgresql' in settings.DATABASES['default']['ENGINE'],
|
|
'Run only against Postgres')
|
|
class PostgresTests(TransactionTestCase):
|
|
# Make sure to start the sequences back at 1
|
|
reset_sequences = True
|
|
|
|
def test_create_object_after_importing_dataset_with_id(self):
|
|
dataset = tablib.Dataset(headers=['id', 'name'])
|
|
dataset.append([1, 'Some book'])
|
|
resource = BookResource()
|
|
result = resource.import_data(dataset)
|
|
self.assertFalse(result.has_errors())
|
|
try:
|
|
Book.objects.create(name='Some other book')
|
|
except IntegrityError:
|
|
self.fail('IntegrityError was raised.')
|
|
|
|
def test_collect_failed_rows(self):
|
|
resource = ProfileResource()
|
|
headers = ['id', 'user']
|
|
# 'user' is a required field, the database will raise an error.
|
|
row = [None, None]
|
|
dataset = tablib.Dataset(row, headers=headers)
|
|
result = resource.import_data(
|
|
dataset, dry_run=True, use_transactions=True,
|
|
collect_failed_rows=True,
|
|
)
|
|
self.assertEqual(
|
|
result.failed_dataset.headers,
|
|
[u'id', u'user', u'Error']
|
|
)
|
|
self.assertEqual(len(result.failed_dataset), 1)
|
|
# We can't check the error message because it's package- and version-dependent
|
|
|
|
|
|
if 'postgresql' in settings.DATABASES['default']['ENGINE']:
|
|
from django.contrib.postgres.fields import ArrayField
|
|
from django.db import models
|
|
|
|
class BookWithChapters(models.Model):
|
|
name = models.CharField('Book name', max_length=100)
|
|
chapters = ArrayField(models.CharField(max_length=100), default=list)
|
|
|
|
class ArrayFieldTest(TestCase):
|
|
fixtures = []
|
|
|
|
def setUp(self):
|
|
pass
|
|
|
|
def test_arrayfield(self):
|
|
dataset_headers = ["id", "name", "chapters"]
|
|
chapters = ["Introduction", "Middle Chapter", "Ending"]
|
|
dataset_row = ["1", "Book With Chapters", ",".join(chapters)]
|
|
dataset = tablib.Dataset(headers=dataset_headers)
|
|
dataset.append(dataset_row)
|
|
book_with_chapters_resource = resources.modelresource_factory(model=BookWithChapters)()
|
|
result = book_with_chapters_resource.import_data(dataset, dry_run=False)
|
|
self.assertFalse(result.has_errors())
|
|
book_with_chapters = list(BookWithChapters.objects.all())[0]
|
|
self.assertListEqual(book_with_chapters.chapters, chapters)
|
|
|
|
|
|
class ForeignKeyWidgetFollowRelationship(TestCase):
|
|
def setUp(self):
|
|
self.user = User.objects.create(username='foo')
|
|
self.role = Role.objects.create(user=self.user)
|
|
self.person = Person.objects.create(role=self.role)
|
|
|
|
def test_export(self):
|
|
class MyPersonResource(resources.ModelResource):
|
|
role = fields.Field(
|
|
column_name='role',
|
|
attribute='role',
|
|
widget=widgets.ForeignKeyWidget(Role, field='user__username')
|
|
)
|
|
|
|
class Meta:
|
|
model = Person
|
|
fields = ['id', 'role']
|
|
|
|
resource = MyPersonResource()
|
|
dataset = resource.export(Person.objects.all())
|
|
self.assertEqual(len(dataset), 1)
|
|
self.assertEqual(dataset[0][0], 'foo')
|
|
|
|
self.role.user = None
|
|
self.role.save()
|
|
|
|
resource = MyPersonResource()
|
|
dataset = resource.export(Person.objects.all())
|
|
self.assertEqual(len(dataset), 1)
|
|
self.assertEqual(dataset[0][0], None)
|
|
|
|
|
|
class ManyRelatedManagerDiffTest(TestCase):
|
|
fixtures = ["category", "book"]
|
|
|
|
def setUp(self):
|
|
pass
|
|
|
|
def test_related_manager_diff(self):
|
|
dataset_headers = ["id", "name", "categories"]
|
|
dataset_row = ["1", "Test Book", "1"]
|
|
original_dataset = tablib.Dataset(headers=dataset_headers)
|
|
original_dataset.append(dataset_row)
|
|
dataset_row[2] = "2"
|
|
changed_dataset = tablib.Dataset(headers=dataset_headers)
|
|
changed_dataset.append(dataset_row)
|
|
|
|
book_resource = BookResource()
|
|
export_headers = book_resource.get_export_headers()
|
|
|
|
add_result = book_resource.import_data(original_dataset, dry_run=False)
|
|
expected_value = u'<ins style="background:#e6ffe6;">1</ins>'
|
|
self.check_value(add_result, export_headers, expected_value)
|
|
change_result = book_resource.import_data(changed_dataset, dry_run=False)
|
|
expected_value = u'<del style="background:#ffe6e6;">1</del><ins style="background:#e6ffe6;">2</ins>'
|
|
self.check_value(change_result, export_headers, expected_value)
|
|
|
|
def check_value(self, result, export_headers, expected_value):
|
|
self.assertEqual(len(result.rows), 1)
|
|
diff = result.rows[0].diff
|
|
self.assertEqual(diff[export_headers.index("categories")],
|
|
expected_value)
|