Skip to content

A lot of deprecation warnings in python 3.10 when dask calling Tornado while saving parquet file in Linux platform #7577

Open
@viper7882

Description

@viper7882

Tornada owner has provided his comment @tornadoweb/tornado#3033 (comment) where
You'll need to change your startup code from something like IOLoop.current().start() to use asyncio.run() instead.

According to /home/issue/dask/venv/lib/python3.10/site-packages/tornado/ioloop.py, Line: 260-262

        .. deprecated:: 6.2
           It is deprecated to call ``IOLoop.current()`` when no `asyncio`
           event loop is running.

I trust the Deprecation Warnings were added in the most recent Tornado v6.2.

For dask, I suspect the issue is coming from /home/issue/dask/venv/lib/python3.10/site-packages/distributed/utils.py, Line: 446-448 where dask distributed is still calling IOLoop instead of asyncio:

                # We're expecting the loop to run in another thread,
                # avoid re-using this thread's assigned loop
                self._loop = IOLoop()

but I understand that you are probably in the best position to comment for dask and distributed.

Environment
OS: ubuntu 22.04.2 LTS
Python: 3.10.6
Dask version: 2023.2.0
distributed version: 2023.2.0
Tornado: 6.2

P/S: I've tried to simply the code below but ended up losing the deprecation warnings. I have no idea which condition has caused deprecation warnings for the codes below.
Code to reproduce:

import json
import os
import platform
import shutil
import unittest

import dask.dataframe as dd
import pandas as pd

from dask.distributed import Client as dask_client
from pathlib import Path

PARQUET_FILE_NAME_EXTENSION = ".parquet"


def delete_directory_and_its_files(dir_path, disable_messages=False):
    # Check if directory exists
    if os.path.exists(dir_path) and os.path.isdir(dir_path):
        # Remove all files in directory
        for filename in os.listdir(dir_path):
            file_path = os.path.join(dir_path, filename)
            try:
                if os.path.isfile(file_path):
                    os.unlink(file_path)
                    if disable_messages == False:
                        print("INFO: Deleted {}...".format(file_path))
            except Exception as e:
                print(f"Error deleting {file_path}: {e}")

        # Remove directory
        try:
            shutil.rmtree(dir_path)
            if disable_messages == False:
                print("INFO: Deleted {}".format(dir_path))
        except Exception as e:
            print(f"Error deleting directory {dir_path}: {e}")


def rename_directory(from_dir, to_dir):
    # Make the directory
    if not os.path.exists(to_dir):
        Path(to_dir).mkdir(parents=True, exist_ok=True)

    for file in os.listdir(from_dir):
        shutil.move(os.path.join(from_dir, file), to_dir)


def save_parquet_file(params: dict) -> None:
    '''
    :param params: df : Pandas dataframe that must be serializable by JSON
                   parquet_dir_and_file_path : Complete file path to store the parquet directory and files. If the
                                               parent path does not exist, dask will create it automatically.

                   delete_parquet_dir_and_file_path_first : True if you want to clear parquet directory and files first
                   max_records : data retention policy where what's the maximum number of rows this function should keep
                   npartitions : number of dask partition, default is 1 for best performance
                   engine : default to pyarrow
                   compression: default to snappy

    :return: None
    '''
    # Un-serialize Params
    df = params['df']
    parquet_dir_and_file_path = params['parquet_dir_and_file_path']

    # Optional Params
    delete_parquet_dir_and_file_path_first = params.get('delete_parquet_dir_and_file_path_first', None)
    max_records = params.get('max_records', None)
    npartitions = params.get('npartitions', 1)
    engine = params.get('engine', 'pyarrow')
    compression = params.get('compression', 'snappy')

    if delete_parquet_dir_and_file_path_first == True:
        delete_directory_and_its_files(parquet_dir_and_file_path)

    with dask_client() as client:
        ddf = dd.from_pandas(df, npartitions=npartitions)
        if os.path.exists(parquet_dir_and_file_path) == False:
            ddf.to_parquet(parquet_dir_and_file_path, engine=engine, compression=compression, overwrite=True)
        else:
            ddf.to_parquet(parquet_dir_and_file_path, engine=engine, compression=compression, append=True,
                           ignore_divisions=True)

        # Compute the persisted task graph to release its resources
        ddf.compute()

        # Persist the Dask DataFrame to release the resources of the previous task
        ddf.persist()
        print("INFO: Saved {} record(s) into {}".format(len(df.index), parquet_dir_and_file_path))

    # If data retention policy presents
    if max_records is not None:
        # Legality Check
        assert isinstance(max_records, int)

        with dask_client() as client:
            # Data retention policy: check if there is a need to remove excessive old records
            ddf = dd.read_parquet(parquet_dir_and_file_path)

            # Get the number of records in the parquet file
            record_count = ddf.compute().shape[0]
            print("INFO: {}, record_count: {} vs max_records: {}".format(
                parquet_dir_and_file_path, record_count, max_records))

            # Check if the record count exceeds max_records
            if record_count > max_records:
                # Remove the excess records
                ddf = ddf.tail(max_records, compute=False)

                # Compute the persisted task graph to release its resources
                ddf.compute()

                # Persist the Dask DataFrame to release the resources of the previous task
                ddf.persist()

                temp_parquet_file_name = "temp" + PARQUET_FILE_NAME_EXTENSION
                temp_parquet_dir_and_file_path = \
                    os.path.join(os.path.abspath(os.path.dirname(os.path.abspath(parquet_dir_and_file_path))),
                                 temp_parquet_file_name)
                delete_directory_and_its_files(temp_parquet_dir_and_file_path)

                dd.to_parquet(ddf, temp_parquet_dir_and_file_path, engine='pyarrow', compression='snappy',
                              write_index=True, overwrite=True, append=False)

                delete_directory_and_its_files(parquet_dir_and_file_path)
                rename_directory(temp_parquet_dir_and_file_path, parquet_dir_and_file_path)

                # Compute the persisted task graph to release its resources
                dd.compute()

                delete_directory_and_its_files(temp_parquet_dir_and_file_path)


class File_System__TestCases(unittest.TestCase):
    def test__write__http__orderbook(self):
        http__orderbook = \
            {'event_timestamp': 1676746901163,
             'next_timestamp': 1676747700000,
             'lastUpdateId': 5145176151}

        # Extracted the asks and bids to simulate permutation of orderbook_entries_to_snap value
        asks = [['23134.12000000', '0.00557000'],
                ['23134.14000000', '0.00167000'],
                ['23134.19000000', '0.00190000'],
                ['23134.20000000', '0.00214000'],
                ['23134.26000000', '0.00261000'],
                ['23134.32000000', '0.00107000'],
                ['23134.37000000', '0.01800000'],
                ['23134.65000000', '0.02000000'],
                ['23134.88000000', '0.00300000'],
                ['23135.97000000', '0.00858000']]

        bids = [['23130.00000000', '0.01300000'],
                ['23129.97000000', '0.00095000'],
                ['23129.96000000', '0.00846000'],
                ['23129.76000000', '0.00867000'],
                ['23129.13000000', '0.01722000'],
                ['23129.00000000', '0.01300000'],
                ['23128.00000000', '0.01300000'],
                ['23127.55000000', '0.00130000'],
                ['23127.54000000', '0.02967000'],
                ['23127.49000000', '0.05723000']]

        # Enter your parameters here
        orderbook_entries_to_snap = 10
        max_http_orderbook_records = int(1e3 + 5e2)
        iterations = 3

        while len(asks) < orderbook_entries_to_snap:
            asks.append(list(asks[len(asks) - 10]))

        while len(bids) < orderbook_entries_to_snap:
            bids.append(list(bids[len(bids) - 10]))

        # Dynamically assign blocks
        http__orderbook['asks'] = asks
        http__orderbook['bids'] = bids

        ts_columns = ['event_timestamp', 'next_timestamp']
        columns = ts_columns + ['last_updated_id', 'asks', 'bids']

        buffer = []
        buffer.append((
            http__orderbook['event_timestamp'],
            http__orderbook['next_timestamp'],
            http__orderbook['lastUpdateId'],

            # Convert 'asks' and 'bids' to strings using json.dumps()
            json.dumps(http__orderbook['asks']),
            json.dumps(http__orderbook['bids']),
        ))
        df = pd.DataFrame(buffer, columns=columns)

        for ts_column in ts_columns:
            df[ts_column] = pd.to_datetime(df[ts_column], unit='ms')

        parquet_file_name = "ut" + PARQUET_FILE_NAME_EXTENSION
        parquet_dir_and_file_path = \
            os.path.join(os.path.abspath(os.path.dirname(os.path.abspath(__file__))), parquet_file_name)

        rows_to_replicate = max_http_orderbook_records

        # Replicate rows
        replicate_df = pd.concat([df] * rows_to_replicate, ignore_index=True)

        for i in range(iterations):
            # Replicate rows
            start_rows = i * rows_to_replicate

            # For append
            huge_df = replicate_df[:]
            huge_df.index += start_rows

            if i == 0:
                delete_parquet_dir_and_file_path_first = True
            else:
                delete_parquet_dir_and_file_path_first = False

            save_parquet_file__dict = dict(
                df=huge_df,
                parquet_dir_and_file_path=parquet_dir_and_file_path,

                # Optional Params
                delete_parquet_dir_and_file_path_first=delete_parquet_dir_and_file_path_first,
                max_records=max_http_orderbook_records,
            )
            save_parquet_file(params=save_parquet_file__dict)


if __name__ == '__main__':
    unittest.main()

Observed behavior:

/home/issue/dask/venv/lib/python3.10/site-packages/tornado/ioloop.py:265: DeprecationWarning: There is no current event loop
  loop = asyncio.get_event_loop()
/home/issue/dask/venv/lib/python3.10/site-packages/tornado/ioloop.py:350: DeprecationWarning: make_current is deprecated; start the event loop first
  self.make_current()
/home/issue/dask/venv/lib/python3.10/site-packages/tornado/platform/asyncio.py:360: DeprecationWarning: There is no current event loop
  self.old_asyncio = asyncio.get_event_loop()
2023-02-21 7:16:53,032 - distributed.diskutils - INFO - Found stale lock file and directory '/tmp/dask-worker-space/worker-09lebnb0', purging
2023-02-21 7:16:53,032 - distributed.diskutils - INFO - Found stale lock file and directory '/tmp/dask-worker-space/worker-wgw_w2n3', purging
INFO: Saved 1500 record(s) into /home/lkw/issue/dask/ut.parquet
/home/issue/dask/venv/lib/python3.10/site-packages/tornado/ioloop.py:265: DeprecationWarning: There is no current event loop
  loop = asyncio.get_event_loop()
/home/issue/dask/venv/lib/python3.10/site-packages/tornado/ioloop.py:350: DeprecationWarning: make_current is deprecated; start the event loop first
  self.make_current()
/home/issue/dask/venv/lib/python3.10/site-packages/tornado/platform/asyncio.py:360: DeprecationWarning: There is no current event loop
  self.old_asyncio = asyncio.get_event_loop()
INFO: /home/issue/dask/ut.parquet, record_count: 1500 vs max_records: 1500
.
.
.

Expected behavior:
No depreciation warnings from Tornado.

Metadata

Metadata

Assignees

No one assigned

    Labels

    deprecationSomething is being removed

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions