diff --git a/django_nose/fixture_tables.py b/django_nose/fixture_tables.py index 7865e90..f9ad5d1 100644 --- a/django_nose/fixture_tables.py +++ b/django_nose/fixture_tables.py @@ -1,16 +1,23 @@ -"""A copy of Django 1.3.0's stock loaddata.py, adapted so that, instead of +"""A copy of Django 1.7.7's stock loaddata.py, adapted so that, instead of loading any data, it returns the tables referenced by a set of fixtures so we can truncate them (and no others) quickly after we're finished with them.""" -import os +from __future__ import unicode_literals + +import glob import gzip +import os +import warnings import zipfile -from itertools import product +from django.apps import apps from django.conf import settings from django.core import serializers -from django.db import router, DEFAULT_DB_ALIAS -from django.db.models import get_apps +from django.db import (connections, DEFAULT_DB_ALIAS) +from django.utils import lru_cache +from django.utils._os import upath +from django.utils.deprecation import RemovedInDjango19Warning +from itertools import product try: import bz2 @@ -20,136 +27,182 @@ def tables_used_by_fixtures(fixture_labels, using=DEFAULT_DB_ALIAS): - """Act like Django's stock loaddata command, but, instead of loading data, - return an iterable of the names of the tables into which data would be - loaded.""" + + connection = connections[using] + # Keep a count of the installed objects and fixtures fixture_count = 0 - loaded_object_count = 0 - fixture_object_count = 0 - tables = set() - - class SingleZipReader(zipfile.ZipFile): - def __init__(self, *args, **kwargs): - zipfile.ZipFile.__init__(self, *args, **kwargs) - if settings.DEBUG: - assert len(self.namelist()) == 1, "Zip-compressed fixtures must contain only one file." - def read(self): - return zipfile.ZipFile.read(self, self.namelist()[0]) - - compression_types = { - None: file, - 'gz': gzip.GzipFile, - 'zip': SingleZipReader - } + models = set() + if has_bz2: - compression_types['bz2'] = bz2.BZ2File - - app_module_paths = [] - for app in get_apps(): - if hasattr(app, '__path__'): - # It's a 'models/' subpackage - for path in app.__path__: - app_module_paths.append(path) + compression_formats['bz2'] = (bz2.BZ2File, 'r') + + with connection.constraint_checks_disabled(): + for fixture_label in fixture_labels: + models.update(get_models(fixture_label, using)) + + # Since we disabled constraint checks, we must manually check for + # any invalid keys that might have been added + table_names = {model._meta.db_table for model in models} + return table_names + + +def get_models(fixture_label, using): + """ + Loads fixtures files for a given label. + """ + models = set() + for fixture_file, fixture_dir, fixture_name in find_fixtures(fixture_label, using): + _, ser_fmt, cmp_fmt = parse_name(os.path.basename(fixture_file)) + open_method, mode = compression_formats[cmp_fmt] + fixture = open_method(fixture_file, mode) + try: + objects_in_fixture = 0 + loaded_objects_in_fixture = 0 + + objects = serializers.deserialize(ser_fmt, fixture, + using=using) # ignorenonexistent=self.ignore + + for obj in objects: + objects_in_fixture += 1 + models.add(obj.object.__class__) + + except Exception as e: + raise Exception("Problem installing fixture '%s': %s" % (fixture_file, e),) + finally: + fixture.close() + + # Warn if the fixture we loaded contains 0 objects. + if objects_in_fixture == 0: + warnings.warn( + "No fixture data found for '%s'. (File format may be " + "invalid.)" % fixture_name, + RuntimeWarning + ) + + return models + + +@lru_cache.lru_cache(maxsize=None) +def find_fixtures(fixture_label, using): + """ + Finds fixture files for a given label. + """ + fixture_name, ser_fmt, cmp_fmt = parse_name(fixture_label) + databases = [using, None] + cmp_fmts = list(compression_formats.keys()) if cmp_fmt is None else [cmp_fmt] + ser_fmts = serializers.get_public_serializer_formats() if ser_fmt is None else [ser_fmt] + + if os.path.isabs(fixture_name): + directories = [os.path.dirname(fixture_name)] + fixture_name = os.path.basename(fixture_name) + else: + directories = fixture_dirs() + if os.path.sep in fixture_name: + directories = [os.path.join(dir_, os.path.dirname(fixture_name)) + for dir_ in directories] + fixture_name = os.path.basename(fixture_name) + + suffixes = ('.'.join(ext for ext in combo if ext) + for combo in product(databases, ser_fmts, cmp_fmts)) + targets = set('.'.join((fixture_name, suffix)) for suffix in suffixes) + + fixture_files = [] + for fixture_dir in directories: + fixture_files_in_dir = [] + for candidate in glob.iglob(os.path.join(fixture_dir, fixture_name + '*')): + if os.path.basename(candidate) in targets: + # Save the fixture_dir and fixture_name for future error messages. + fixture_files_in_dir.append((candidate, fixture_dir, fixture_name)) + + # Check kept for backwards-compatibility; it isn't clear why + # duplicates are only allowed in different directories. + if len(fixture_files_in_dir) > 1: + raise RuntimeError( + "Multiple fixtures named '%s' in %s. Aborting." % + (fixture_name, humanize(fixture_dir))) + fixture_files.extend(fixture_files_in_dir) + + if fixture_name != 'initial_data' and not fixture_files: + # Warning kept for backwards-compatibility; why not an exception? + warnings.warn("No fixture named '%s' found." % fixture_name) + elif fixture_name == 'initial_data' and fixture_files: + warnings.warn( + 'initial_data fixtures are deprecated. Use data migrations instead.', + RemovedInDjango19Warning + ) + + return fixture_files + + +@lru_cache.lru_cache(maxsize=None) +def fixture_dirs(): + """ + Return a list of fixture directories. + + The list contains the 'fixtures' subdirectory of each installed + application, if it exists, the directories in FIXTURE_DIRS, and the + current directory. + """ + dirs = [] + for app_config in apps.get_app_configs(): + app_dir = os.path.join(app_config.path, 'fixtures') + if os.path.isdir(app_dir): + dirs.append(app_dir) + dirs.extend(list(settings.FIXTURE_DIRS)) + dirs.append('') + dirs = [upath(os.path.abspath(os.path.realpath(d))) for d in dirs] + return dirs + + +def parse_name(fixture_name): + """ + Splits fixture name in name, serialization format, compression format. + """ + parts = fixture_name.rsplit('.', 2) + + if len(parts) > 1 and parts[-1] in compression_formats: + cmp_fmt = parts[-1] + parts = parts[:-1] + else: + cmp_fmt = None + + if len(parts) > 1: + if parts[-1] in serialization_formats: + ser_fmt = parts[-1] + parts = parts[:-1] else: - # It's a models.py module - app_module_paths.append(app.__file__) + raise RuntimeError( + "Problem loading fixture '%s': %s is not a known " + "serialization format." % (''.join(parts[:-1]), parts[-1])) + else: + ser_fmt = None - app_fixtures = [os.path.join(os.path.dirname(path), 'fixtures') for path in app_module_paths] - for fixture_label in fixture_labels: - parts = fixture_label.split('.') + name = '.'.join(parts) - if len(parts) > 1 and parts[-1] in compression_types: - compression_formats = [parts[-1]] - parts = parts[:-1] - else: - compression_formats = list(compression_types.keys()) + return name, ser_fmt, cmp_fmt - if len(parts) == 1: - fixture_name = parts[0] - formats = serializers.get_public_serializer_formats() - else: - fixture_name, format = '.'.join(parts[:-1]), parts[-1] - if format in serializers.get_public_serializer_formats(): - formats = [format] - else: - formats = [] - - if not formats: - # stderr.write(style.ERROR("Problem installing fixture '%s': %s is - # not a known serialization format.\n" % (fixture_name, format))) - return set() - - if os.path.isabs(fixture_name): - fixture_dirs = [fixture_name] - else: - fixture_dirs = app_fixtures + list(settings.FIXTURE_DIRS) + [''] - - for fixture_dir in fixture_dirs: - # stdout.write("Checking %s for fixtures...\n" % - # humanize(fixture_dir)) - - label_found = False - for combo in product([using, None], formats, compression_formats): - database, format, compression_format = combo - file_name = '.'.join( - p for p in [ - fixture_name, database, format, compression_format - ] - if p - ) - - # stdout.write("Trying %s for %s fixture '%s'...\n" % \ - # (humanize(fixture_dir), file_name, fixture_name)) - full_path = os.path.join(fixture_dir, file_name) - open_method = compression_types[compression_format] - try: - fixture = open_method(full_path, 'r') - if label_found: - fixture.close() - # stderr.write(style.ERROR("Multiple fixtures named - # '%s' in %s. Aborting.\n" % (fixture_name, - # humanize(fixture_dir)))) - return set() - else: - fixture_count += 1 - objects_in_fixture = 0 - loaded_objects_in_fixture = 0 - # stdout.write("Installing %s fixture '%s' from %s.\n" - # % (format, fixture_name, humanize(fixture_dir))) - try: - objects = serializers.deserialize(format, fixture, using=using) - for obj in objects: - objects_in_fixture += 1 - if router.allow_syncdb(using, obj.object.__class__): - loaded_objects_in_fixture += 1 - tables.add( - obj.object.__class__._meta.db_table) - loaded_object_count += loaded_objects_in_fixture - fixture_object_count += objects_in_fixture - label_found = True - except (SystemExit, KeyboardInterrupt): - raise - except Exception: - fixture.close() - # stderr.write( style.ERROR("Problem installing - # fixture '%s': %s\n" % (full_path, ''.join(tra - # ceback.format_exception(sys.exc_type, - # sys.exc_value, sys.exc_traceback))))) - return set() - fixture.close() - - # If the fixture we loaded contains 0 objects, assume that an - # error was encountered during fixture loading. - if objects_in_fixture == 0: - # stderr.write( style.ERROR("No fixture data found - # for '%s'. (File format may be invalid.)\n" % - # (fixture_name))) - return set() - - except Exception: - # stdout.write("No %s fixture '%s' in %s.\n" % \ (format, - # fixture_name, humanize(fixture_dir))) - pass - - return tables + +class SingleZipReader(zipfile.ZipFile): + + def __init__(self, *args, **kwargs): + zipfile.ZipFile.__init__(self, *args, **kwargs) + if len(self.namelist()) != 1: + raise ValueError("Zip-compressed fixtures must contain one file.") + + def read(self): + return zipfile.ZipFile.read(self, self.namelist()[0]) + + +def humanize(dirname): + return "'%s'" % dirname if dirname else 'absolute path' + + +# Forcing binary mode may be revisited after dropping Python 2 support (see #22399) +compression_formats = { + None: (open, 'rb'), + 'gz': (gzip.GzipFile, 'rb'), + 'zip': (SingleZipReader, 'r'), +} + +serialization_formats = serializers.get_public_serializer_formats() diff --git a/django_nose/plugin.py b/django_nose/plugin.py index 43f13e8..5411895 100644 --- a/django_nose/plugin.py +++ b/django_nose/plugin.py @@ -98,7 +98,8 @@ def add(self, test): if is_subclass_at_all(test.context, FastFixtureTestCase): # We bucket even FFTCs that don't have any fixtures, but it # shouldn't matter. - key = (frozenset(getattr(test.context, 'fixtures', [])), + fixtures = getattr(test.context, 'fixtures', None) or [] + key = (frozenset(fixtures), getattr(test.context, 'exempt_from_fixture_bundling', False)) diff --git a/django_nose/testcases.py b/django_nose/testcases.py index a899ec3..b607d25 100644 --- a/django_nose/testcases.py +++ b/django_nose/testcases.py @@ -63,7 +63,7 @@ def tearDownClass(cls): def _fixture_setup(cls): """Load fixture data, and commit.""" for db in cls._databases(): - if (hasattr(cls, 'fixtures') and + if (getattr(cls, 'fixtures', None) and getattr(cls, '_fb_should_setup_fixtures', True)): # Iff the fixture-bundling test runner tells us we're the first # suite having these fixtures, set them up: @@ -77,7 +77,7 @@ def _fixture_setup(cls): @classmethod def _fixture_teardown(cls): """Empty (only) the tables we loaded fixtures into, then commit.""" - if hasattr(cls, 'fixtures') and \ + if getattr(cls, 'fixtures', None) and \ getattr(cls, '_fb_should_teardown_fixtures', True): # If the fixture-bundling test runner advises us that the next test # suite is going to reuse these fixtures, don't tear them down.