debian-django-cachalot/cachalot/tests/postgres.py

439 lines
18 KiB
Python

# coding: utf-8
from __future__ import unicode_literals
from datetime import date, datetime
from decimal import Decimal
from platform import python_version_tuple
from unittest import skipUnless, skipIf
from django import VERSION as django_version
from django.core.cache import caches
from django.core.cache.backends.filebased import FileBasedCache
from django.db import connection
from django.test import TransactionTestCase, override_settings
from psycopg2.extras import NumericRange, DateRange, DateTimeTZRange
from pytz import timezone
from .models import PostgresModel, Test
DJANGO_GTE_1_9 = django_version[:2] >= (1, 9)
if DJANGO_GTE_1_9:
from django.contrib.postgres.functions import TransactionNow
# FIXME: Add tests for aggregations.
@skipUnless(connection.vendor == 'postgresql',
'This test is only for PostgreSQL')
@skipIf(isinstance(caches['default'], FileBasedCache)
and python_version_tuple()[:2] == ('2', '7'),
'Caching psycopg2 objects is not working with file-based cache '
'and Python 2.7 (see https://code.djangoproject.com/ticket/25501).')
@override_settings(USE_TZ=True)
class PostgresReadTestCase(TransactionTestCase):
def setUp(self):
self.obj1 = PostgresModel(
int_array=[1, 2, 3],
hstore={'a': 'b', 'c': None},
int_range=[1900, 2000], float_range=[-1e3, 9.87654321],
date_range=['1678-03-04', '1741-07-28'],
datetime_range=[datetime(1989, 1, 30, 12, 20,
tzinfo=timezone('Europe/Paris')), None])
if DJANGO_GTE_1_9:
self.obj1.json = {'a': 1, 'b': 2}
self.obj1.save()
self.obj2 = PostgresModel(
int_array=[4, None, 6],
hstore={'a': '1', 'b': '2'},
int_range=[1989, None], float_range=[0.0, None],
date_range=['1989-01-30', None],
datetime_range=[None, None])
if DJANGO_GTE_1_9:
self.obj2.json = [
'something',
{
'a': 1,
'b': None,
'c': 123.456,
'd': True,
'e': {
'another': 'dict',
'and yet': {
'another': 'one',
'with a list': [],
},
},
},
]
self.obj2.save()
def test_unaccent(self):
Test.objects.create(name='Clémentine')
Test.objects.create(name='Clementine')
qs = Test.objects.filter(name__unaccent='Clémentine')
with self.assertNumQueries(1):
data1 = [t.name for t in qs.all()]
with self.assertNumQueries(0):
data2 = [t.name for t in qs.all()]
self.assertListEqual(data2, data1)
self.assertListEqual(data2, ['Clementine', 'Clémentine'])
def test_int_array(self):
qs = PostgresModel.objects.all()
with self.assertNumQueries(1):
data1 = [o.int_array for o in qs.all()]
with self.assertNumQueries(0):
data2 = [o.int_array for o in qs.all()]
self.assertListEqual(data2, data1)
self.assertListEqual(data2, [[1, 2, 3], [4, None, 6]])
qs = PostgresModel.objects.filter(int_array__contains=[3])
with self.assertNumQueries(1):
data3 = [o.int_array for o in qs.all()]
with self.assertNumQueries(0):
data4 = [o.int_array for o in qs.all()]
self.assertListEqual(data4, data3)
self.assertListEqual(data4, [[1, 2, 3]])
qs = PostgresModel.objects.filter(int_array__contained_by=[1, 2, 3,
4, 5, 6])
with self.assertNumQueries(1):
data7 = [o.int_array for o in qs.all()]
with self.assertNumQueries(0):
data8 = [o.int_array for o in qs.all()]
self.assertListEqual(data8, data7)
self.assertListEqual(data8, [[1, 2, 3]])
qs = PostgresModel.objects.filter(int_array__overlap=[3, 4])
with self.assertNumQueries(1):
data9 = [o.int_array for o in qs.all()]
with self.assertNumQueries(0):
data10 = [o.int_array for o in qs.all()]
self.assertListEqual(data10, data9)
self.assertListEqual(data10, [[1, 2, 3], [4, None, 6]])
qs = PostgresModel.objects.filter(int_array__len__in=(2, 3))
with self.assertNumQueries(1):
data11 = [o.int_array for o in qs.all()]
with self.assertNumQueries(0):
data12 = [o.int_array for o in qs.all()]
self.assertListEqual(data12, data11)
self.assertListEqual(data12, [[1, 2, 3], [4, None, 6]])
qs = PostgresModel.objects.filter(int_array__2=6)
with self.assertNumQueries(1):
data13 = [o.int_array for o in qs.all()]
with self.assertNumQueries(0):
data14 = [o.int_array for o in qs.all()]
self.assertListEqual(data14, data13)
self.assertListEqual(data14, [[4, None, 6]])
qs = PostgresModel.objects.filter(int_array__0_2=(1, 2))
with self.assertNumQueries(1):
data15 = [o.int_array for o in qs.all()]
with self.assertNumQueries(0):
data16 = [o.int_array for o in qs.all()]
self.assertListEqual(data16, data15)
self.assertListEqual(data16, [[1, 2, 3]])
def test_hstore(self):
qs = PostgresModel.objects.all()
with self.assertNumQueries(1):
data1 = [o.hstore for o in qs.all()]
with self.assertNumQueries(0):
data2 = [o.hstore for o in qs.all()]
self.assertListEqual(data2, data1)
self.assertListEqual(data2, [{'a': 'b', 'c': None},
{'a': '1', 'b': '2'}])
qs = PostgresModel.objects.filter(hstore__a='1')
with self.assertNumQueries(1):
data3 = [o.hstore for o in qs.all()]
with self.assertNumQueries(0):
data4 = [o.hstore for o in qs.all()]
self.assertListEqual(data4, data3)
self.assertListEqual(data4, [{'a': '1', 'b': '2'}])
qs = PostgresModel.objects.filter(
hstore__contains={'a': 'b'})
with self.assertNumQueries(1):
data5 = [o.hstore for o in qs.all()]
with self.assertNumQueries(0):
data6 = [o.hstore for o in qs.all()]
self.assertListEqual(data6, data5)
self.assertListEqual(data6, [{'a': 'b', 'c': None}])
qs = PostgresModel.objects.filter(
hstore__contained_by={'a': 'b', 'c': None, 'b': '2'})
with self.assertNumQueries(1):
data7 = [o.hstore for o in qs.all()]
with self.assertNumQueries(0):
data8 = [o.hstore for o in qs.all()]
self.assertListEqual(data8, data7)
self.assertListEqual(data8, [{'a': 'b', 'c': None}])
qs = PostgresModel.objects.filter(hstore__has_key='c')
with self.assertNumQueries(1):
data9 = [o.hstore for o in qs.all()]
with self.assertNumQueries(0):
data10 = [o.hstore for o in qs.all()]
self.assertListEqual(data10, data9)
self.assertListEqual(data10, [{'a': 'b', 'c': None}])
qs = PostgresModel.objects.filter(hstore__has_keys=['a', 'b'])
with self.assertNumQueries(1):
data11 = [o.hstore for o in qs.all()]
with self.assertNumQueries(0):
data12 = [o.hstore for o in qs.all()]
self.assertListEqual(data12, data11)
self.assertListEqual(data12, [{'a': '1', 'b': '2'}])
qs = PostgresModel.objects.filter(hstore__keys=['a', 'b'])
with self.assertNumQueries(1):
data13 = [o.hstore for o in qs.all()]
with self.assertNumQueries(0):
data14 = [o.hstore for o in qs.all()]
self.assertListEqual(data14, data13)
self.assertListEqual(data14, [{'a': '1', 'b': '2'}])
qs = PostgresModel.objects.filter(hstore__values=['1', '2'])
with self.assertNumQueries(1):
data15 = [o.hstore for o in qs.all()]
with self.assertNumQueries(0):
data16 = [o.hstore for o in qs.all()]
self.assertListEqual(data16, data15)
self.assertListEqual(data16, [{'a': '1', 'b': '2'}])
@skipUnless(DJANGO_GTE_1_9,
'JSON field is only available in Django >= 1.9')
def test_json(self):
qs = PostgresModel.objects.all()
with self.assertNumQueries(1):
data1 = [o.json for o in qs.all()]
with self.assertNumQueries(0):
data2 = [o.json for o in qs.all()]
self.assertListEqual(data2, data1)
self.assertListEqual(data2, [self.obj1.json, self.obj2.json])
# Tests an index.
qs = PostgresModel.objects.filter(json__0='something')
with self.assertNumQueries(1):
data3 = [o.json for o in qs.all()]
with self.assertNumQueries(0):
data4 = [o.json for o in qs.all()]
self.assertListEqual(data4, data3)
self.assertListEqual(data4, [self.obj2.json])
qs = PostgresModel.objects.filter(json__0__nonexistent_key='something')
with self.assertNumQueries(1):
data5 = [o.json for o in qs.all()]
with self.assertNumQueries(0):
data6 = [o.json for o in qs.all()]
self.assertListEqual(data6, data5)
self.assertListEqual(data6, [])
# Tests a path with spaces.
qs = PostgresModel.objects.filter(
**{'json__1__e__and yet__another': 'one'})
with self.assertNumQueries(1):
data7 = [o.json for o in qs.all()]
with self.assertNumQueries(0):
data8 = [o.json for o in qs.all()]
self.assertListEqual(data8, data7)
self.assertListEqual(data8, [self.obj2.json])
qs = PostgresModel.objects.filter(json__contains=['something'])
with self.assertNumQueries(1):
data9 = [o.json for o in qs.all()]
with self.assertNumQueries(0):
data10 = [o.json for o in qs.all()]
self.assertListEqual(data10, data9)
self.assertListEqual(data10, [self.obj2.json])
qs = PostgresModel.objects.filter(
json__contained_by={'a': 1, 'b': 2, 'any': 'thing'})
with self.assertNumQueries(1):
data11 = [o.json for o in qs.all()]
with self.assertNumQueries(0):
data12 = [o.json for o in qs.all()]
self.assertListEqual(data12, data11)
self.assertListEqual(data12, [self.obj1.json])
qs = PostgresModel.objects.filter(json__has_key='a')
with self.assertNumQueries(1):
data13 = [o.json for o in qs.all()]
with self.assertNumQueries(0):
data14 = [o.json for o in qs.all()]
self.assertListEqual(data14, data13)
self.assertListEqual(data14, [self.obj1.json])
qs = PostgresModel.objects.filter(json__has_any_keys=['a', 'b', 'c'])
with self.assertNumQueries(1):
data15 = [o.json for o in qs.all()]
with self.assertNumQueries(0):
data16 = [o.json for o in qs.all()]
self.assertListEqual(data16, data15)
self.assertListEqual(data16, [self.obj1.json])
qs = PostgresModel.objects.filter(json__has_keys=['a', 'b'])
with self.assertNumQueries(1):
data17 = [o.json for o in qs.all()]
with self.assertNumQueries(0):
data18 = [o.json for o in qs.all()]
self.assertListEqual(data18, data17)
self.assertListEqual(data18, [self.obj1.json])
def test_int_range(self):
qs = PostgresModel.objects.all()
with self.assertNumQueries(1):
data1 = [o.int_range for o in qs.all()]
with self.assertNumQueries(0):
data2 = [o.int_range for o in qs.all()]
self.assertListEqual(data2, data1)
self.assertListEqual(data2, [NumericRange(1900, 2000),
NumericRange(1989)])
qs = PostgresModel.objects.filter(int_range__contains=2015)
with self.assertNumQueries(1):
data3 = [o.int_range for o in qs.all()]
with self.assertNumQueries(0):
data4 = [o.int_range for o in qs.all()]
self.assertListEqual(data4, data3)
self.assertListEqual(data4, [NumericRange(1989)])
qs = PostgresModel.objects.filter(
int_range__contains=NumericRange(1950, 1990))
with self.assertNumQueries(1):
data5 = [o.int_range for o in qs.all()]
with self.assertNumQueries(0):
data6 = [o.int_range for o in qs.all()]
self.assertListEqual(data6, data5)
self.assertListEqual(data6, [NumericRange(1900, 2000)])
qs = PostgresModel.objects.filter(
int_range__contained_by=NumericRange(0, 2050))
with self.assertNumQueries(1):
data5 = [o.int_range for o in qs.all()]
with self.assertNumQueries(0):
data6 = [o.int_range for o in qs.all()]
self.assertListEqual(data6, data5)
self.assertListEqual(data6, [NumericRange(1900, 2000)])
qs = PostgresModel.objects.filter(int_range__fully_lt=(2015, None))
with self.assertNumQueries(1):
data7 = [o.int_range for o in qs.all()]
with self.assertNumQueries(0):
data8 = [o.int_range for o in qs.all()]
self.assertListEqual(data8, data7)
self.assertListEqual(data8, [NumericRange(1900, 2000)])
qs = PostgresModel.objects.filter(int_range__fully_gt=(1970, 1980))
with self.assertNumQueries(1):
data9 = [o.int_range for o in qs.all()]
with self.assertNumQueries(0):
data10 = [o.int_range for o in qs.all()]
self.assertListEqual(data10, data9)
self.assertListEqual(data10, [NumericRange(1989)])
qs = PostgresModel.objects.filter(int_range__not_lt=(1970, 1980))
with self.assertNumQueries(1):
data11 = [o.int_range for o in qs.all()]
with self.assertNumQueries(0):
data12 = [o.int_range for o in qs.all()]
self.assertListEqual(data12, data11)
self.assertListEqual(data12, [NumericRange(1989)])
qs = PostgresModel.objects.filter(int_range__not_gt=(1970, 1980))
with self.assertNumQueries(1):
data13 = [o.int_range for o in qs.all()]
with self.assertNumQueries(0):
data14 = [o.int_range for o in qs.all()]
self.assertListEqual(data14, data13)
self.assertListEqual(data14, [])
qs = PostgresModel.objects.filter(int_range__adjacent_to=(1900, 1989))
with self.assertNumQueries(1):
data15 = [o.int_range for o in qs.all()]
with self.assertNumQueries(0):
data16 = [o.int_range for o in qs.all()]
self.assertListEqual(data16, data15)
self.assertListEqual(data16, [NumericRange(1989)])
qs = PostgresModel.objects.filter(int_range__startswith=1900)
with self.assertNumQueries(1):
data17 = [o.int_range for o in qs.all()]
with self.assertNumQueries(0):
data18 = [o.int_range for o in qs.all()]
self.assertListEqual(data18, data17)
self.assertListEqual(data18, [NumericRange(1900, 2000)])
qs = PostgresModel.objects.filter(int_range__endswith=2000)
with self.assertNumQueries(1):
data19 = [o.int_range for o in qs.all()]
with self.assertNumQueries(0):
data20 = [o.int_range for o in qs.all()]
self.assertListEqual(data20, data19)
self.assertListEqual(data20, [NumericRange(1900, 2000)])
PostgresModel.objects.create(int_range=[1900, 1900])
qs = PostgresModel.objects.filter(int_range__isempty=True)
with self.assertNumQueries(1):
data21 = [o.int_range for o in qs.all()]
with self.assertNumQueries(0):
data22 = [o.int_range for o in qs.all()]
self.assertListEqual(data22, data21)
self.assertListEqual(data22, [NumericRange(empty=True)])
def test_float_range(self):
with self.assertNumQueries(1):
data1 = [o.float_range for o in PostgresModel.objects.all()]
with self.assertNumQueries(0):
data2 = [o.float_range for o in PostgresModel.objects.all()]
self.assertListEqual(data2, data1)
# For a strange reason, probably a misconception in psycopg2
# or a bad name in django.contrib.postgres (less probable),
# FloatRange returns decimals instead of floats.
self.assertListEqual(data2, [
NumericRange(Decimal('-1000.0'), Decimal('9.87654321')),
NumericRange(Decimal('0.0'))])
def test_date_range(self):
with self.assertNumQueries(1):
data1 = [o.date_range for o in PostgresModel.objects.all()]
with self.assertNumQueries(0):
data2 = [o.date_range for o in PostgresModel.objects.all()]
self.assertListEqual(data2, data1)
self.assertListEqual(data2, [
DateRange(date(1678, 3, 4), date(1741, 7, 28)),
DateRange(date(1989, 1, 30))])
def test_datetime_range(self):
with self.assertNumQueries(1):
data1 = [o.datetime_range for o in PostgresModel.objects.all()]
with self.assertNumQueries(0):
data2 = [o.datetime_range for o in PostgresModel.objects.all()]
self.assertListEqual(data2, data1)
self.assertListEqual(data2, [
DateTimeTZRange(datetime(1989, 1, 30, 12, 20,
tzinfo=timezone('Europe/Paris'))),
DateTimeTZRange(bounds='()')])
@skipUnless(DJANGO_GTE_1_9,
'TransactionNow is only available in Django >= 1.9')
def test_transaction_now(self):
"""
Checks that queries with a TransactionNow() parameter are not cached.
"""
obj = Test.objects.create(datetime='1992-07-02T12:00:00')
qs = Test.objects.filter(
datetime__lte=TransactionNow())
with self.assertNumQueries(1):
obj1 = qs.get()
with self.assertNumQueries(1):
obj2 = qs.get()
self.assertEqual(obj1, obj2)
self.assertEqual(obj1, obj)