Skip to content

distributed/tests/test_client.py::test_upload_file_zip fails on Python 3.13 #8708

Open
@QuLogic

Description

@QuLogic

Describe the issue:
Running with Python 3.13, the noted test fails. If I change the test to only upload one zip file, then it passes. If I change the test to upload to different zip files, then it also passes.

There seems to be some kind of cache in pkgutil that breaks, but I'm also not sure if this test is really safe since it's trying to load a path twice?

Minimal Complete Verifiable Example:

$ python3.13 -m venv venv313
$ . venv313/bin/activate
$ pip install pytest flaky pytest-timeout -e .
$ pytest -k upload_file_zip
pytest output
__________ test_upload_file_zip __________

c = <Client: No scheduler connected>, s = <Scheduler 'tcp://127.0.0.1:34703', workers: 0, cores: 0, tasks: 0>, a = <Worker 'tcp://127.0.0.1:33475', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:33909', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>

    @gen_cluster(client=True)
    async def test_upload_file_zip(c, s, a, b):
        def g():
            import myfile
    
            return myfile.f()
    
        with save_sys_modules():
            try:
                for value in [123, 456]:
                    with tmp_text(
                        "myfile.py", f"def f():\n    return {value}"
                    ) as fn_my_file:
                        with zipfile.ZipFile("myfile.zip", "w") as z:
                            z.write(fn_my_file, arcname=os.path.basename(fn_my_file))
>                       await c.upload_file("myfile.zip")

distributed/tests/test_client.py:1716: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/client.py:3824: in _
    results = await asyncio.gather(
distributed/client.py:4939: in _register_scheduler_plugin
    return await self.scheduler.register_scheduler_plugin(
distributed/core.py:1395: in send_recv_from_rpc
    return await send_recv(comm=comm, op=key, **kwargs)
distributed/core.py:1179: in send_recv
    raise exc.with_traceback(tb)
distributed/core.py:970: in _handle_comm
    result = await result
distributed/scheduler.py:6101: in register_scheduler_plugin
    await result
distributed/diagnostics/plugin.py:396: in start
    await scheduler.upload_file(self.filename, self.data, load=self.load)
distributed/core.py:563: in upload_file
    raise e
distributed/core.py:560: in upload_file
    import_file(out_filename)
distributed/utils.py:1158: in import_file
    names_to_import.extend(names)
distributed/utils.py:1157: in <genexpr>
    names = (mod_info.name for mod_info in pkgutil.iter_modules([path]))
/usr/lib64/python3.13/pkgutil.py:116: in iter_modules
    for name, ispkg in iter_importer_modules(i, prefix):
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

    """Utilities to support packages."""
    
    from collections import namedtuple
    from functools import singledispatch as simplegeneric
    import importlib
    import importlib.util
    import importlib.machinery
    import os
    import os.path
    import sys
    from types import ModuleType
    import warnings
    
    __all__ = [
        'get_importer', 'iter_importers', 'get_loader', 'find_loader',
        'walk_packages', 'iter_modules', 'get_data',
        'read_code', 'extend_path',
        'ModuleInfo',
    ]
    
    
    ModuleInfo = namedtuple('ModuleInfo', 'module_finder name ispkg')
    ModuleInfo.__doc__ = 'A namedtuple with minimal info about a module.'
    
    
    def read_code(stream):
        # This helper is needed in order for the PEP 302 emulation to
        # correctly handle compiled files
        import marshal
    
        magic = stream.read(4)
        if magic != importlib.util.MAGIC_NUMBER:
            return None
    
        stream.read(12) # Skip rest of the header
        return marshal.load(stream)
    
    
    def walk_packages(path=None, prefix='', onerror=None):
        """Yields ModuleInfo for all modules recursively
        on path, or, if path is None, all accessible modules.
    
        'path' should be either None or a list of paths to look for
        modules in.
    
        'prefix' is a string to output on the front of every module name
        on output.
    
        Note that this function must import all *packages* (NOT all
        modules!) on the given path, in order to access the __path__
        attribute to find submodules.
    
        'onerror' is a function which gets called with one argument (the
        name of the package which was being imported) if any exception
        occurs while trying to import a package.  If no onerror function is
        supplied, ImportErrors are caught and ignored, while all other
        exceptions are propagated, terminating the search.
    
        Examples:
    
        # list all modules python can access
        walk_packages()
    
        # list all submodules of ctypes
        walk_packages(ctypes.__path__, ctypes.__name__+'.')
        """
    
        def seen(p, m={}):
            if p in m:
                return True
            m[p] = True
    
        for info in iter_modules(path, prefix):
            yield info
    
            if info.ispkg:
                try:
                    __import__(info.name)
                except ImportError:
                    if onerror is not None:
                        onerror(info.name)
                except Exception:
                    if onerror is not None:
                        onerror(info.name)
                    else:
                        raise
                else:
                    path = getattr(sys.modules[info.name], '__path__', None) or []
    
                    # don't traverse path items we've seen before
                    path = [p for p in path if not seen(p)]
    
                    yield from walk_packages(path, info.name+'.', onerror)
    
    
    def iter_modules(path=None, prefix=''):
        """Yields ModuleInfo for all submodules on path,
        or, if path is None, all top-level modules on sys.path.
    
        'path' should be either None or a list of paths to look for
        modules in.
    
        'prefix' is a string to output on the front of every module name
        on output.
        """
        if path is None:
            importers = iter_importers()
        elif isinstance(path, str):
            raise ValueError("path must be None or list of paths to look for "
                            "modules in")
        else:
            importers = map(get_importer, path)
    
        yielded = {}
        for i in importers:
            for name, ispkg in iter_importer_modules(i, prefix):
                if name not in yielded:
                    yielded[name] = 1
                    yield ModuleInfo(i, name, ispkg)
    
    
    @simplegeneric
    def iter_importer_modules(importer, prefix=''):
        if not hasattr(importer, 'iter_modules'):
            return []
        return importer.iter_modules(prefix)
    
    
    # Implement a file walker for the normal importlib path hook
    def _iter_file_finder_modules(importer, prefix=''):
        if importer.path is None or not os.path.isdir(importer.path):
            return
    
        yielded = {}
        import inspect
        try:
            filenames = os.listdir(importer.path)
        except OSError:
            # ignore unreadable directories like import does
            filenames = []
        filenames.sort()  # handle packages before same-named modules
    
        for fn in filenames:
            modname = inspect.getmodulename(fn)
            if modname=='__init__' or modname in yielded:
                continue
    
            path = os.path.join(importer.path, fn)
            ispkg = False
    
            if not modname and os.path.isdir(path) and '.' not in fn:
                modname = fn
                try:
                    dircontents = os.listdir(path)
                except OSError:
                    # ignore unreadable directories like import does
                    dircontents = []
                for fn in dircontents:
                    subname = inspect.getmodulename(fn)
                    if subname=='__init__':
                        ispkg = True
                        break
                else:
                    continue    # not a package
    
            if modname and '.' not in modname:
                yielded[modname] = 1
                yield prefix + modname, ispkg
    
    iter_importer_modules.register(
        importlib.machinery.FileFinder, _iter_file_finder_modules)
    
    
    try:
        import zipimport
        from zipimport import zipimporter
    
        def iter_zipimport_modules(importer, prefix=''):
>           dirlist = sorted(zipimport._zip_directory_cache[importer.archive])
E           KeyError: '/tmp/dask-scratch-space/scheduler-belyj1dp/myfile.zip'

/usr/lib64/python3.13/pkgutil.py:179: KeyError
---------------------------------------------------------------------------------------------------------------------------------- Captured stdout call ----------------------------------------------------------------------------------------------------------------------------------
Dumped cluster state to test_cluster_dump/test_upload_file_zip.yaml
---------------------------------------------------------------------------------------------------------------------------------- Captured stderr call ----------------------------------------------------------------------------------------------------------------------------------
2024-06-19 23:11:07,494 - distributed.http.proxy - INFO - To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
2024-06-19 23:11:07,496 - distributed.scheduler - INFO - State start
2024-06-19 23:11:07,499 - distributed.scheduler - INFO -   Scheduler at:     tcp://127.0.0.1:34703
2024-06-19 23:11:07,499 - distributed.scheduler - INFO -   dashboard at:  http://127.0.0.1:41157/status
2024-06-19 23:11:07,500 - distributed.scheduler - INFO - Registering Worker plugin shuffle
2024-06-19 23:11:07,504 - distributed.worker - INFO -       Start worker at:      tcp://127.0.0.1:33475
2024-06-19 23:11:07,504 - distributed.worker - INFO -          Listening to:      tcp://127.0.0.1:33475
2024-06-19 23:11:07,504 - distributed.worker - INFO -           Worker name:                          0
2024-06-19 23:11:07,504 - distributed.worker - INFO -          dashboard at:            127.0.0.1:32889
2024-06-19 23:11:07,504 - distributed.worker - INFO - Waiting to connect to:      tcp://127.0.0.1:34703
2024-06-19 23:11:07,505 - distributed.worker - INFO - -------------------------------------------------
2024-06-19 23:11:07,505 - distributed.worker - INFO -               Threads:                          1
2024-06-19 23:11:07,505 - distributed.worker - INFO -                Memory:                  14.58 GiB
2024-06-19 23:11:07,505 - distributed.worker - INFO -       Local Directory: /tmp/dask-scratch-space/worker-88bh7q0r
2024-06-19 23:11:07,505 - distributed.worker - INFO - -------------------------------------------------
2024-06-19 23:11:07,505 - distributed.worker - INFO -       Start worker at:      tcp://127.0.0.1:33909
2024-06-19 23:11:07,505 - distributed.worker - INFO -          Listening to:      tcp://127.0.0.1:33909
2024-06-19 23:11:07,505 - distributed.worker - INFO -           Worker name:                          1
2024-06-19 23:11:07,505 - distributed.worker - INFO -          dashboard at:            127.0.0.1:38847
2024-06-19 23:11:07,505 - distributed.worker - INFO - Waiting to connect to:      tcp://127.0.0.1:34703
2024-06-19 23:11:07,505 - distributed.worker - INFO - -------------------------------------------------
2024-06-19 23:11:07,505 - distributed.worker - INFO -               Threads:                          2
2024-06-19 23:11:07,505 - distributed.worker - INFO -                Memory:                  14.58 GiB
2024-06-19 23:11:07,506 - distributed.worker - INFO -       Local Directory: /tmp/dask-scratch-space/worker-9ws5zvi8
2024-06-19 23:11:07,506 - distributed.worker - INFO - -------------------------------------------------
2024-06-19 23:11:07,510 - distributed.scheduler - INFO - Register worker <WorkerState 'tcp://127.0.0.1:33475', name: 0, status: init, memory: 0, processing: 0>
2024-06-19 23:11:07,510 - distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:33475
2024-06-19 23:11:07,511 - distributed.core - INFO - Starting established connection to tcp://127.0.0.1:58754
2024-06-19 23:11:07,511 - distributed.scheduler - INFO - Register worker <WorkerState 'tcp://127.0.0.1:33909', name: 1, status: init, memory: 0, processing: 0>
2024-06-19 23:11:07,512 - distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:33909
2024-06-19 23:11:07,512 - distributed.core - INFO - Starting established connection to tcp://127.0.0.1:58760
2024-06-19 23:11:07,512 - distributed.worker - INFO - Starting Worker plugin shuffle
2024-06-19 23:11:07,512 - distributed.worker - INFO - Starting Worker plugin shuffle
2024-06-19 23:11:07,513 - distributed.worker - INFO -         Registered to:      tcp://127.0.0.1:34703
2024-06-19 23:11:07,513 - distributed.worker - INFO - -------------------------------------------------
2024-06-19 23:11:07,513 - distributed.worker - INFO -         Registered to:      tcp://127.0.0.1:34703
2024-06-19 23:11:07,513 - distributed.worker - INFO - -------------------------------------------------
2024-06-19 23:11:07,513 - distributed.core - INFO - Starting established connection to tcp://127.0.0.1:34703
2024-06-19 23:11:07,513 - distributed.core - INFO - Starting established connection to tcp://127.0.0.1:34703
2024-06-19 23:11:07,517 - distributed.scheduler - INFO - Receive client connection: Client-bc9b8305-2eb2-11ef-9803-94de8078e5f9
2024-06-19 23:11:07,518 - distributed.core - INFO - Starting established connection to tcp://127.0.0.1:58772
2024-06-19 23:11:07,520 - distributed.utils - INFO - Reload module myfile from .zip file
2024-06-19 23:11:07,521 - distributed.scheduler - INFO - Registering Worker plugin myfile.zip27b46e23-4d3a-4005-af03-5c315e636c17
2024-06-19 23:11:07,523 - distributed.worker - INFO - Starting Worker plugin myfile.zip27b46e23-4d3a-4005-af03-5c315e636c17
2024-06-19 23:11:07,523 - distributed.utils - INFO - Reload module myfile from .zip file
2024-06-19 23:11:07,524 - distributed.worker - INFO - Starting Worker plugin myfile.zip27b46e23-4d3a-4005-af03-5c315e636c17
2024-06-19 23:11:07,524 - distributed.utils - INFO - Reload module myfile from .zip file
2024-06-19 23:11:07,535 - distributed.core - ERROR - '/tmp/dask-scratch-space/scheduler-belyj1dp/myfile.zip'
Traceback (most recent call last):
  File "/home/elliott/code/distributed/distributed/core.py", line 560, in upload_file
    import_file(out_filename)
    ~~~~~~~~~~~^^^^^^^^^^^^^^
  File "/home/elliott/code/distributed/distributed/utils.py", line 1158, in import_file
    names_to_import.extend(names)
    ~~~~~~~~~~~~~~~~~~~~~~^^^^^^^
  File "/home/elliott/code/distributed/distributed/utils.py", line 1157, in <genexpr>
    names = (mod_info.name for mod_info in pkgutil.iter_modules([path]))
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib64/python3.13/pkgutil.py", line 116, in iter_modules
    for name, ispkg in iter_importer_modules(i, prefix):
    ...<2 lines>...
            yield ModuleInfo(i, name, ispkg)
  File "/usr/lib64/python3.13/pkgutil.py", line 179, in iter_zipimport_modules
    dirlist = sorted(zipimport._zip_directory_cache[importer.archive])
                     ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^
KeyError: '/tmp/dask-scratch-space/scheduler-belyj1dp/myfile.zip'
2024-06-19 23:11:07,536 - distributed.core - ERROR - Exception while handling op register_scheduler_plugin
Traceback (most recent call last):
  File "/home/elliott/code/distributed/distributed/core.py", line 970, in _handle_comm
    result = await result
             ^^^^^^^^^^^^
  File "/home/elliott/code/distributed/distributed/scheduler.py", line 6101, in register_scheduler_plugin
    await result
  File "/home/elliott/code/distributed/distributed/diagnostics/plugin.py", line 396, in start
    await scheduler.upload_file(self.filename, self.data, load=self.load)
  File "/home/elliott/code/distributed/distributed/core.py", line 563, in upload_file
    raise e
  File "/home/elliott/code/distributed/distributed/core.py", line 560, in upload_file
    import_file(out_filename)
    ~~~~~~~~~~~^^^^^^^^^^^^^^
  File "/home/elliott/code/distributed/distributed/utils.py", line 1158, in import_file
    names_to_import.extend(names)
    ~~~~~~~~~~~~~~~~~~~~~~^^^^^^^
  File "/home/elliott/code/distributed/distributed/utils.py", line 1157, in <genexpr>
    names = (mod_info.name for mod_info in pkgutil.iter_modules([path]))
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib64/python3.13/pkgutil.py", line 116, in iter_modules
    for name, ispkg in iter_importer_modules(i, prefix):
    ...<2 lines>...
            yield ModuleInfo(i, name, ispkg)
  File "/usr/lib64/python3.13/pkgutil.py", line 179, in iter_zipimport_modules
    dirlist = sorted(zipimport._zip_directory_cache[importer.archive])
                     ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^
KeyError: '/tmp/dask-scratch-space/scheduler-belyj1dp/myfile.zip'
2024-06-19 23:11:07,568 - distributed.scheduler - INFO - Registering Worker plugin myfile.zipbf83a2b2-6077-46d1-b29f-95eeaf2cae3d
2024-06-19 23:11:07,569 - distributed.worker - INFO - Starting Worker plugin myfile.zipbf83a2b2-6077-46d1-b29f-95eeaf2cae3d
2024-06-19 23:11:07,569 - distributed.core - ERROR - '/tmp/dask-scratch-space/worker-88bh7q0r/myfile.zip'
Traceback (most recent call last):
  File "/home/elliott/code/distributed/distributed/core.py", line 560, in upload_file
    import_file(out_filename)
    ~~~~~~~~~~~^^^^^^^^^^^^^^
  File "/home/elliott/code/distributed/distributed/utils.py", line 1158, in import_file
    names_to_import.extend(names)
    ~~~~~~~~~~~~~~~~~~~~~~^^^^^^^
  File "/home/elliott/code/distributed/distributed/utils.py", line 1157, in <genexpr>
    names = (mod_info.name for mod_info in pkgutil.iter_modules([path]))
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib64/python3.13/pkgutil.py", line 116, in iter_modules
    for name, ispkg in iter_importer_modules(i, prefix):
    ...<2 lines>...
            yield ModuleInfo(i, name, ispkg)
  File "/usr/lib64/python3.13/pkgutil.py", line 179, in iter_zipimport_modules
    dirlist = sorted(zipimport._zip_directory_cache[importer.archive])
                     ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^
KeyError: '/tmp/dask-scratch-space/worker-88bh7q0r/myfile.zip'
2024-06-19 23:11:07,634 - distributed.worker - INFO - Starting Worker plugin myfile.zipbf83a2b2-6077-46d1-b29f-95eeaf2cae3d
2024-06-19 23:11:07,635 - distributed.utils - INFO - Reload module myfile from .zip file
2024-06-19 23:11:07,636 - distributed.scheduler - INFO - Remove client Client-bc9b8305-2eb2-11ef-9803-94de8078e5f9
2024-06-19 23:11:07,636 - distributed.core - INFO - Received 'close-stream' from tcp://127.0.0.1:58772; closing.
2024-06-19 23:11:07,636 - distributed.scheduler - INFO - Remove client Client-bc9b8305-2eb2-11ef-9803-94de8078e5f9
2024-06-19 23:11:07,637 - distributed.scheduler - INFO - Close client connection: Client-bc9b8305-2eb2-11ef-9803-94de8078e5f9
2024-06-19 23:11:07,638 - distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:33475. Reason: worker-close
2024-06-19 23:11:07,638 - distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:33909. Reason: worker-close
2024-06-19 23:11:07,640 - distributed.core - INFO - Connection to tcp://127.0.0.1:34703 has been closed.
2024-06-19 23:11:07,640 - distributed.core - INFO - Connection to tcp://127.0.0.1:34703 has been closed.
2024-06-19 23:11:07,640 - distributed.core - INFO - Received 'close-stream' from tcp://127.0.0.1:58754; closing.
2024-06-19 23:11:07,640 - distributed.core - INFO - Received 'close-stream' from tcp://127.0.0.1:58760; closing.
2024-06-19 23:11:07,641 - distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://127.0.0.1:33475', name: 0, status: closing, memory: 0, processing: 0> (stimulus_id='handle-worker-cleanup-1718853067.6411417')
2024-06-19 23:11:07,641 - distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://127.0.0.1:33909', name: 1, status: closing, memory: 0, processing: 0> (stimulus_id='handle-worker-cleanup-1718853067.6413925')
2024-06-19 23:11:07,641 - distributed.scheduler - INFO - Lost all workers
2024-06-19 23:11:07,642 - distributed.scheduler - INFO - Scheduler closing due to unknown reason...
2024-06-19 23:11:07,642 - distributed.scheduler - INFO - Scheduler closing all comms

Environment:

  • Dask version: c669cc4
  • Python version: Python 3.13.0b2
  • Operating System: Fedora 39
  • Install method (conda, pip, source): source

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions