Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
315 changes: 184 additions & 131 deletions django_nose/fixture_tables.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,23 @@
"""A copy of Django 1.3.0's stock loaddata.py, adapted so that, instead of
"""A copy of Django 1.7.7's stock loaddata.py, adapted so that, instead of
loading any data, it returns the tables referenced by a set of fixtures so we
can truncate them (and no others) quickly after we're finished with them."""

import os
from __future__ import unicode_literals

import glob
import gzip
import os
import warnings
import zipfile
from itertools import product

from django.apps import apps
from django.conf import settings
from django.core import serializers
from django.db import router, DEFAULT_DB_ALIAS
from django.db.models import get_apps
from django.db import (connections, DEFAULT_DB_ALIAS)
from django.utils import lru_cache
from django.utils._os import upath
from django.utils.deprecation import RemovedInDjango19Warning
from itertools import product

try:
import bz2
Expand All @@ -20,136 +27,182 @@


def tables_used_by_fixtures(fixture_labels, using=DEFAULT_DB_ALIAS):
"""Act like Django's stock loaddata command, but, instead of loading data,
return an iterable of the names of the tables into which data would be
loaded."""

connection = connections[using]

# Keep a count of the installed objects and fixtures
fixture_count = 0
loaded_object_count = 0
fixture_object_count = 0
tables = set()

class SingleZipReader(zipfile.ZipFile):
def __init__(self, *args, **kwargs):
zipfile.ZipFile.__init__(self, *args, **kwargs)
if settings.DEBUG:
assert len(self.namelist()) == 1, "Zip-compressed fixtures must contain only one file."
def read(self):
return zipfile.ZipFile.read(self, self.namelist()[0])

compression_types = {
None: file,
'gz': gzip.GzipFile,
'zip': SingleZipReader
}
models = set()

if has_bz2:
compression_types['bz2'] = bz2.BZ2File

app_module_paths = []
for app in get_apps():
if hasattr(app, '__path__'):
# It's a 'models/' subpackage
for path in app.__path__:
app_module_paths.append(path)
compression_formats['bz2'] = (bz2.BZ2File, 'r')

with connection.constraint_checks_disabled():
for fixture_label in fixture_labels:
models.update(get_models(fixture_label, using))

# Since we disabled constraint checks, we must manually check for
# any invalid keys that might have been added
table_names = {model._meta.db_table for model in models}
return table_names


def get_models(fixture_label, using):
"""
Loads fixtures files for a given label.
"""
models = set()
for fixture_file, fixture_dir, fixture_name in find_fixtures(fixture_label, using):
_, ser_fmt, cmp_fmt = parse_name(os.path.basename(fixture_file))
open_method, mode = compression_formats[cmp_fmt]
fixture = open_method(fixture_file, mode)
try:
objects_in_fixture = 0
loaded_objects_in_fixture = 0

objects = serializers.deserialize(ser_fmt, fixture,
using=using) # ignorenonexistent=self.ignore

for obj in objects:
objects_in_fixture += 1
models.add(obj.object.__class__)

except Exception as e:
raise Exception("Problem installing fixture '%s': %s" % (fixture_file, e),)
finally:
fixture.close()

# Warn if the fixture we loaded contains 0 objects.
if objects_in_fixture == 0:
warnings.warn(
"No fixture data found for '%s'. (File format may be "
"invalid.)" % fixture_name,
RuntimeWarning
)

return models


@lru_cache.lru_cache(maxsize=None)
def find_fixtures(fixture_label, using):
"""
Finds fixture files for a given label.
"""
fixture_name, ser_fmt, cmp_fmt = parse_name(fixture_label)
databases = [using, None]
cmp_fmts = list(compression_formats.keys()) if cmp_fmt is None else [cmp_fmt]
ser_fmts = serializers.get_public_serializer_formats() if ser_fmt is None else [ser_fmt]

if os.path.isabs(fixture_name):
directories = [os.path.dirname(fixture_name)]
fixture_name = os.path.basename(fixture_name)
else:
directories = fixture_dirs()
if os.path.sep in fixture_name:
directories = [os.path.join(dir_, os.path.dirname(fixture_name))
for dir_ in directories]
fixture_name = os.path.basename(fixture_name)

suffixes = ('.'.join(ext for ext in combo if ext)
for combo in product(databases, ser_fmts, cmp_fmts))
targets = set('.'.join((fixture_name, suffix)) for suffix in suffixes)

fixture_files = []
for fixture_dir in directories:
fixture_files_in_dir = []
for candidate in glob.iglob(os.path.join(fixture_dir, fixture_name + '*')):
if os.path.basename(candidate) in targets:
# Save the fixture_dir and fixture_name for future error messages.
fixture_files_in_dir.append((candidate, fixture_dir, fixture_name))

# Check kept for backwards-compatibility; it isn't clear why
# duplicates are only allowed in different directories.
if len(fixture_files_in_dir) > 1:
raise RuntimeError(
"Multiple fixtures named '%s' in %s. Aborting." %
(fixture_name, humanize(fixture_dir)))
fixture_files.extend(fixture_files_in_dir)

if fixture_name != 'initial_data' and not fixture_files:
# Warning kept for backwards-compatibility; why not an exception?
warnings.warn("No fixture named '%s' found." % fixture_name)
elif fixture_name == 'initial_data' and fixture_files:
warnings.warn(
'initial_data fixtures are deprecated. Use data migrations instead.',
RemovedInDjango19Warning
)

return fixture_files


@lru_cache.lru_cache(maxsize=None)
def fixture_dirs():
"""
Return a list of fixture directories.

The list contains the 'fixtures' subdirectory of each installed
application, if it exists, the directories in FIXTURE_DIRS, and the
current directory.
"""
dirs = []
for app_config in apps.get_app_configs():
app_dir = os.path.join(app_config.path, 'fixtures')
if os.path.isdir(app_dir):
dirs.append(app_dir)
dirs.extend(list(settings.FIXTURE_DIRS))
dirs.append('')
dirs = [upath(os.path.abspath(os.path.realpath(d))) for d in dirs]
return dirs


def parse_name(fixture_name):
"""
Splits fixture name in name, serialization format, compression format.
"""
parts = fixture_name.rsplit('.', 2)

if len(parts) > 1 and parts[-1] in compression_formats:
cmp_fmt = parts[-1]
parts = parts[:-1]
else:
cmp_fmt = None

if len(parts) > 1:
if parts[-1] in serialization_formats:
ser_fmt = parts[-1]
parts = parts[:-1]
else:
# It's a models.py module
app_module_paths.append(app.__file__)
raise RuntimeError(
"Problem loading fixture '%s': %s is not a known "
"serialization format." % (''.join(parts[:-1]), parts[-1]))
else:
ser_fmt = None

app_fixtures = [os.path.join(os.path.dirname(path), 'fixtures') for path in app_module_paths]
for fixture_label in fixture_labels:
parts = fixture_label.split('.')
name = '.'.join(parts)

if len(parts) > 1 and parts[-1] in compression_types:
compression_formats = [parts[-1]]
parts = parts[:-1]
else:
compression_formats = list(compression_types.keys())
return name, ser_fmt, cmp_fmt

if len(parts) == 1:
fixture_name = parts[0]
formats = serializers.get_public_serializer_formats()
else:
fixture_name, format = '.'.join(parts[:-1]), parts[-1]
if format in serializers.get_public_serializer_formats():
formats = [format]
else:
formats = []

if not formats:
# stderr.write(style.ERROR("Problem installing fixture '%s': %s is
# not a known serialization format.\n" % (fixture_name, format)))
return set()

if os.path.isabs(fixture_name):
fixture_dirs = [fixture_name]
else:
fixture_dirs = app_fixtures + list(settings.FIXTURE_DIRS) + ['']

for fixture_dir in fixture_dirs:
# stdout.write("Checking %s for fixtures...\n" %
# humanize(fixture_dir))

label_found = False
for combo in product([using, None], formats, compression_formats):
database, format, compression_format = combo
file_name = '.'.join(
p for p in [
fixture_name, database, format, compression_format
]
if p
)

# stdout.write("Trying %s for %s fixture '%s'...\n" % \
# (humanize(fixture_dir), file_name, fixture_name))
full_path = os.path.join(fixture_dir, file_name)
open_method = compression_types[compression_format]
try:
fixture = open_method(full_path, 'r')
if label_found:
fixture.close()
# stderr.write(style.ERROR("Multiple fixtures named
# '%s' in %s. Aborting.\n" % (fixture_name,
# humanize(fixture_dir))))
return set()
else:
fixture_count += 1
objects_in_fixture = 0
loaded_objects_in_fixture = 0
# stdout.write("Installing %s fixture '%s' from %s.\n"
# % (format, fixture_name, humanize(fixture_dir)))
try:
objects = serializers.deserialize(format, fixture, using=using)
for obj in objects:
objects_in_fixture += 1
if router.allow_syncdb(using, obj.object.__class__):
loaded_objects_in_fixture += 1
tables.add(
obj.object.__class__._meta.db_table)
loaded_object_count += loaded_objects_in_fixture
fixture_object_count += objects_in_fixture
label_found = True
except (SystemExit, KeyboardInterrupt):
raise
except Exception:
fixture.close()
# stderr.write( style.ERROR("Problem installing
# fixture '%s': %s\n" % (full_path, ''.join(tra
# ceback.format_exception(sys.exc_type,
# sys.exc_value, sys.exc_traceback)))))
return set()
fixture.close()

# If the fixture we loaded contains 0 objects, assume that an
# error was encountered during fixture loading.
if objects_in_fixture == 0:
# stderr.write( style.ERROR("No fixture data found
# for '%s'. (File format may be invalid.)\n" %
# (fixture_name)))
return set()

except Exception:
# stdout.write("No %s fixture '%s' in %s.\n" % \ (format,
# fixture_name, humanize(fixture_dir)))
pass

return tables

class SingleZipReader(zipfile.ZipFile):

def __init__(self, *args, **kwargs):
zipfile.ZipFile.__init__(self, *args, **kwargs)
if len(self.namelist()) != 1:
raise ValueError("Zip-compressed fixtures must contain one file.")

def read(self):
return zipfile.ZipFile.read(self, self.namelist()[0])


def humanize(dirname):
return "'%s'" % dirname if dirname else 'absolute path'


# Forcing binary mode may be revisited after dropping Python 2 support (see #22399)
compression_formats = {
None: (open, 'rb'),
'gz': (gzip.GzipFile, 'rb'),
'zip': (SingleZipReader, 'r'),
}

serialization_formats = serializers.get_public_serializer_formats()
3 changes: 2 additions & 1 deletion django_nose/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ def add(self, test):
if is_subclass_at_all(test.context, FastFixtureTestCase):
# We bucket even FFTCs that don't have any fixtures, but it
# shouldn't matter.
key = (frozenset(getattr(test.context, 'fixtures', [])),
fixtures = getattr(test.context, 'fixtures', None) or []
key = (frozenset(fixtures),
getattr(test.context,
'exempt_from_fixture_bundling',
False))
Expand Down
4 changes: 2 additions & 2 deletions django_nose/testcases.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def tearDownClass(cls):
def _fixture_setup(cls):
"""Load fixture data, and commit."""
for db in cls._databases():
if (hasattr(cls, 'fixtures') and
if (getattr(cls, 'fixtures', None) and
getattr(cls, '_fb_should_setup_fixtures', True)):
# Iff the fixture-bundling test runner tells us we're the first
# suite having these fixtures, set them up:
Expand All @@ -77,7 +77,7 @@ def _fixture_setup(cls):
@classmethod
def _fixture_teardown(cls):
"""Empty (only) the tables we loaded fixtures into, then commit."""
if hasattr(cls, 'fixtures') and \
if getattr(cls, 'fixtures', None) and \
getattr(cls, '_fb_should_teardown_fixtures', True):
# If the fixture-bundling test runner advises us that the next test
# suite is going to reuse these fixtures, don't tear them down.
Expand Down