Skip to content

Implement compute(streaming=True) #1312

@hombit

Description

@hombit

Feature request

The idea is to propose a different way of computing the result, giving users a trade-off between performance (streaming=False) and low memory with less chance of worker timeout (streaming=True). The .compute(streaming=True) would do something like:

def compute(self, streaming: bool = False, **kwargs):
    if not streaming:
        return npd.NestedFrame(super().compute(**kwargs))

    client = dask.distributed.get_client()
    partitions_per_chunk = 1 if client is None else len(client.scheduler_info()["workers"])

    batches = []
    for batch in CatalogStream(self, client=client, partitions_per_chunk=partitions_per_chunk, shuffle=False):
        if len(batch) == 0:
            continue
        batches.append(batch)
    result = pd.concat(batches)
    return npd.NestedFrame(result)

Before submitting
Please check the following:

  • I have described the purpose of the suggested change, specifying what I need the enhancement to accomplish, i.e. what problem it solves.
  • I have included any relevant links, screenshots, environment information, and data relevant to implementing the requested feature, as well as pseudocode for how I want to access the new functionality.
  • If I have ideas for how the new feature could be implemented, I have provided explanations and/or pseudocode and/or task lists for the steps.

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or requestperformanceFor slow queries or compute bottlenecks

    Type

    No type

    Projects

    Status

    No status

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions