Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Example for using a separate threadpool for CPU bound work (try 2) #14286

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

alamb
Copy link
Contributor

@alamb alamb commented Jan 24, 2025

Note: This PR contains a (substantial) example and supporting code. It has no changes to the core.

Which issue does this PR close?

Rationale for this change

I have heard from multiple people multiple times that the specifics of using multiple threadpools for separate CPU and IO work in DataFusion is confusing.

However, this is a key detail for building high performance engines which process data directly from remote storage, which I think is a very important capability for DataFusion

My past attempts to make this example have been bogged down trying to get consensus on details of how to transfer results across streams, the wisdom of wrapping streams, and other details.

However, there have been no other actual alternatives proposed (aka no actual code that I could write an example with). So while the approach of wrapping streams may be a bit ugly, it is working for us in production at InfluxDB and I believe @adriangb said they are using the same strategy at Pydantic.

In my opinion, the community needs something that works, even if it is not optimial, which is this example.

I would personally like to merge it in so it is easier to find (and not locked in a PR) and iterate afterwards

What changes are included in this PR?

  1. thread_pools.rs example
  2. a dedicated_executor.rs module that

Note that the DedicatedExecutor code is orginally from

  1. InfluxDB 3.0 (source link), largely written by @tustvold and @crepererum
  2. The IoObjectStore wrapper is based on work from @matthewmturne in Add DedicatedExecutor to FlightSQL Server datafusion-contrib/datafusion-dft#247

Note that I have purposely avoided any changes to the DataFusion crates (such as adding DedicatedExecutor to physical-plan), but I have written the code with tests with an eye towards doing exactly such a thing one we have some experience / feedback on the approach. This is tracked by

Right now, I think the most important thing is to get this example in for people to try and confirm if it helps them

Are these changes tested?

Yes the example is run as part of CI and there are tests

TODO: Verify that the tests in the examples run in CI

Are there any user-facing changes?

Not really

@adriangb
Copy link
Contributor

I think this is great Andrew. For what it's worth if this were packaged up in some installable way (even if it had to be from git, etc.) I'm sure we'd be super happy to can our custom stuff and use this :)

@alamb alamb force-pushed the alamb/threadpool_example3 branch from df43a02 to b5d4ae1 Compare January 25, 2025 13:38
@alamb
Copy link
Contributor Author

alamb commented Jan 25, 2025

I think this is great Andrew. For what it's worth if this were packaged up in some installable way (even if it had to be from git, etc.) I'm sure we'd be super happy to can our custom stuff and use this :)

Thank you 🙏 If this is the case I would be happy to make a PR to put DedicatedExecutor into the core. If had some evidence that people would actually use this I could make a more compelling story to put it into the core (despite the hesitations from @tustvold )

@adriangb Is there any way you can test / verify that the structures in this crate would work for you (like can you temporarily copy/paste dedicated_executor.rs into your tree?)

/// Demonstrates running queries so that
/// 1. IO operations happen on the current thread pool
/// 2. CPU bound tasks happen on a different thread pool
async fn different_runtime_advanced() -> Result<()> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the example that shows the best practice for separating CPU and IO

//
// ctx.register_object_store(&base_url, http_store);
// A Tokio 1.x context was found, but timers are disabled. Call `enable_time` on the runtime builder to enable timers.
let http_store = dedicated_executor.wrap_object_store_for_io(http_store);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This pattern is @tustvold 's core concern as I understand -- that the overhead of transferring data back/forth between runtimes by wrapping streams could be avoided with lower level hooks for IO

@adriangb
Copy link
Contributor

Yes I'll do just that and report back. I don't think it should block merging this as an example though.

I understand the hesitation to put it in core, but unless there's something better that's plug and play I think it's better to have this than nothing.

Could an alternative be to put it in a contrib crate or a new workspace package (so that CI also runs on it and it can have tests), add whatever APIs core needs to make it easier to plug in and document the setup? In my mind unless it's set up by default there isn't much more value in having it in core vs any other installable package.

// specific language governing permissions and limitations
// under the License.

//! [DedicatedExecutor] for running CPU-bound tasks on a separate tokio runtime.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a monster amount of code to put in an example -- it is a large amount of tests and documentation and I wrote it with an eye towards eventually putting it into datafusion itself

///
/// [`DedicatedExecutor::spawn_io`] for more details
#[derive(Debug)]
pub struct IoObjectStore {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the IoObjectStore that implements ObjectStore and wraps another ObjectStore that does IO on a different thread.

}

// -----------------------------------
// ----- Tests for IoObjectStore ------
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here are tests that prove the IO is actually happening on the correct thread (both the original requests as well as the streams that are returned)

@alamb
Copy link
Contributor Author

alamb commented Jan 25, 2025

Is there anyone willing to test out this approach in their application so we can get some confirmation that it avoids some of the timeouts reported while doing heavy processing of data from object store?

Some people I think who have reported seeing this issue:

I am willing to help make a draft PR to whatever repo to hook DedicatedExecutor up, if someone is willing to then test on a case that was seeing object store timeouts before

Also FYI @ozankabak as we discussed this topic recently.

@tustvold
Copy link
Contributor

tustvold commented Jan 25, 2025

In the interests of avoiding confusion, as my objections appear to have gotten a little misinterpreted, I'd like to clarify the fact this approach comes with non-trivial overheads is not what concerns me with this approach. Rather that we know from experience at InfluxData that this pattern is fragile, easy to mess up, and leads to emergent behaviour that is highly non-trivial to reproduce and debug.

That being said as Andrew says, nobody has emerged who is able/willing to resolve this with a more holistic approach, e.g. something closer to what polars/DuckDb/Hyper are doing to separate IO/compute, and so proceeding with something is better than nothing. I just hoped someone might step up and run with something along the lines of #13692 (comment)

@ion-elgreco
Copy link

Based on the previous discussions, and draft PRs, I ended up with this Object store wrapper to spawn the io tasks in a different handle: https://github.com/delta-io/delta-rs/blob/main/crates%2Fcore%2Fsrc%2Fstorage%2Fmod.rs#L116-L124

I do like the approach proposed in this PR, and worthwhile to have it be part of core.

Regarding testing it, since this wrapper above was added, I haven't heard anyone mentioning these issues in delta-rs anymore outside of @Tom-Newton, did you end up resolving it on your end? I forgot to follow up on it

Slightly offtopic, but I wonder if there was anything done with surfacing the true error better? Because error decoding response body on its own is not clear enough if you don't have the context of why it can happen, I assume the body is incomplete because the request is timed out, if that's the case shouldn't that be the error message instead?

@Tom-Newton
Copy link

We are still having quite significant problems, but we can't reproduce it reliably and I haven't been personally working on it recently.

Anecdotally we think it's more frequent when:

  1. Reading between between regions (e.g. compute in Azure US South Central reading Azure US East blob storage).
  2. When spawning a large number of parallel jobs that all read the same thing using delta-rs.
  3. Reading one particular table where we need to read a larger data volume. delta-rs via object-store is still only reading metadata though (order 100MB) and the larger data volumes loaded using the pyarrow Azure filesystem don't seem to suffer the same problem.

@alamb
Copy link
Contributor Author

alamb commented Jan 25, 2025

Based on the previous discussions, and draft PRs, I ended up with this Object store wrapper to spawn the io tasks in a different handle: https://github.com/delta-io/delta-rs/blob/main/crates%2Fcore%2Fsrc%2Fstorage%2Fmod.rs#L116-L124

This is neat -- I actually like that it is in terms of just a tokio runtime rather than somehthing like DedicatedExecutor. I might try and work on IoObjectStore to follow that pattern instead

One thing I noticed is that it doesn't actually shim the get result stream (so while the intial GET request is on the other runtime, when reading data from the stream that will still happen on the same runtime)

I handled this by wrapping the stream like this:

    /// Wrap the result stream if necessary
    fn wrap_if_necessary(&self, payload: GetResultPayload) -> GetResultPayload {
        match payload {
            GetResultPayload::File(_, _) => payload,
            GetResultPayload::Stream(stream) => {
                let new_stream = self.dedicated_executor.run_io_stream(stream).boxed();
                GetResultPayload::Stream(new_stream)
            }
        }
    }

It might be an interesting thing to try in delta.rs 🤔

@ion-elgreco
Copy link

@alamb ah good point, I missed that! Definitely good to add, will have a better look at where these payload streams are collected

Comment on lines +612 to +614
.on_thread_start(move || {
DedicatedExecutor::register_io_runtime(io_handle.clone())
})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll note that this clobbers any on_thread_start set on runtime_builder. Unfortunately it is not stored publicly on RuntimeBuilder so it is not possible to pull off the existing setting and wrap it. I suggest we add an on_thread_start method to DedicatedExecutorBuilder?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'd be nice if there was also a way to warn users that we are clobbering their configuration...

@adriangb
Copy link
Contributor

I've successfully made a PR to integrate this internally. It was pretty straightforward. We'll have to scrutinize a bit to see if we can tell if anything is missing (this is very error prone code that can go bad in subtle ways) and if all goes well will report back after this is in production for a couple of days.

@davidhewitt
Copy link
Contributor

This is neat -- I actually like that it is in terms of just a tokio runtime rather than somehthing like DedicatedExecutor. I might try and work on IoObjectStore to follow that pattern instead

I think that makes sense to me too. When I see the current IoObjectStore containing a DedicatedExecutor and then calling spawn_io, the abstraction feels muddled. Having just a handle to a runtime to spawn works on seems simpler and generalises better.

In the interests of avoiding confusion, as my objections appear to have gotten a little misinterpreted, I'd like to clarify the fact this approach comes with non-trivial overheads is not what concerns me with this approach. Rather that we know from experience at InfluxData that this pattern is fragile, easy to mess up, and leads to emergent behaviour that is highly non-trivial to reproduce and debug.

Just to put the brakes on rushing this into core too quickly, I have to support @tustvold in raising concern; we have had plenty of issues at Pydantic with this pattern where we have missed cases where we should have spawned IO (or CPU work) onto the other runtime.

It seems to me like most folks agree research into schedulers for the CPU work that can sit within the single tokio runtime would be much easier to integrate for downstream use cases, probably at the cost of complexity (overheads?) within datafusion itself.

We have already vendored this pattern and have it working for us, you don't need to rush this through for us.

@JanKaul
Copy link
Contributor

JanKaul commented Jan 27, 2025

This looks great. I'll try to reproduce my issue and try it out.

And I have to dig a bit deeper into the topic to understand the concerns about this approach.


#[async_trait]
impl MultipartUpload for IoMultipartUpload {
fn put_part(&mut self, data: PutPayload) -> UploadPart {
Copy link
Contributor

@rohitrastogi rohitrastogi Jan 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the S3 implementation of ObjectStore, put_part is 'tokio spawned' on the runtime of its caller (it uses a JoinSet and currently uses spawn and not spawn_on):
https://github.com/apache/arrow-rs/blob/6aaff7e38a573f797b31f89f869c3706cbe26e37/object_store/src/upload.rs#L211

If I understand the implementation and integration pattern correctly, this means that the put_part future for S3 will be spawned on the dedicated (CPU only) executor and not the IO runtime. We may have to patch the object store crate to get true IO/CPU isolation in this case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this just needs to be wrapped in spawn_io, the overhead of the additional tokio task will be irrelevant when compared to the cross RT and IO overheads.

The fact this is so hard to spot is a good example of why this pattern is really fragile

Copy link
Contributor

@rohitrastogi rohitrastogi Jan 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm trying to wrap put_part with spawn_io, but I'm encountering some issues. For multipart uploads, I believe the part_idx needs to be incremented in the same order that PutPayload instances are generated from the stream being written. The indirection introduced by sending the put_part future to the I/O runtime via spawn_io seems to cause the payloads to be written out of order.

As a result, I'm seeing failures in complete() that indicate parts with a size smaller than 5MB are being sent before the last part.

I don't see actually anything in the standard S3MultipartUpload implementation that prevents this behavior, but I haven't encountered any failures related to this in the past, so I may be mistaken.

Copy link
Contributor

@tustvold tustvold Jan 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It shouldn't matter that they're written out of order as they're assembled based on the index captured when the future is created. Could you perhaps share your code?

Edit: I created an explicit test in object_store to show this - apache/arrow-rs#7047

Copy link
Contributor

@rohitrastogi rohitrastogi Jan 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, the test was very helpful and clearly illustrates that even though the async part uploads run concurrently, the synchronous part idx assignment happens serially on the same thread. I accidentally changed this behavior by wrapping inner in an arc mutex (to fix some borrow checker issues) and having all of inner put_parts() contend for the mutex, which broke the ordering guarantees. Fixed it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rohitrastogi , could you suggest the edit to this PR that resolved your issue with multipart uploads (or at least drop in a patch as a comment)? I'm just now realizing that the issue I'm seeing (#14286 (comment)) is the exact same that you were discussing... took me a while to make the connection, but now making sense that there is some threshold above which objectstore switches from a single "put" to a multipart, and that is exactly when I'm seeing the IO error, as you already reported.

@JanKaul
Copy link
Contributor

JanKaul commented Jan 28, 2025

For your information, I was able to reproduce the IO stall error with a very simple example. In case anybody is interested and wants to try it out. I will test the dedicated executor tomorrow.

@alamb
Copy link
Contributor Author

alamb commented Jan 28, 2025

It seems to me like most folks agree research into schedulers for the CPU work that can sit within the single tokio runtime would be much easier to integrate for downstream use cases, probably at the cost of complexity (overheads?) within datafusion itself.

@davidhewitt I agree in principle. I think any time one has to cross from sync code to async code the API is going to get challenging. The upside of the current DataFusion approach with a single tokio runtime is it just "works" (for some defintion of work)

The downside is that squeezing out the maximum performance is tricky

Though I would argue postponing the pain until you actually need really good network performance might be better than forcing users to sort it out upfront before they can even run simple things

Of course, ideally it would "just work" for users and they wouldn't have to worry about it at all

@JanKaul
Copy link
Contributor

JanKaul commented Jan 29, 2025

I tried the dedicated executor in my test example. I'm not entirely sure if I'm using it wrong or maybe my test is too contrived. But I'm still getting a single Timeout Error. This is already much better compared to when using a single runtime, which produces many TimeOut Errors.

I invite everybody to have a look, maybe you can spot a bug.

I will try the dedicated executor with a real example. The problem is that it is difficult to reproduce because the issue only occurred at a certain size.

@tustvold
Copy link
Contributor

tustvold commented Jan 30, 2025

I took a look at your example @JanKaul, thank you for this as I think it very nicely demonstrates the challenge of shimming IO at the object store interface. Unfortunately I think this may be a different issue from the runtime stall issue.

At a high level the code is doing this

object_store.get().into_stream().map(|x| {
    sleep(Duration::from_seconds(2));
}).try_collect()

The problem is this introduces 2 second pauses between polling the streaming request from object storage. Regardless of runtime setup, holding a request across a long-running task will result in timeouts, as backpressure will eventually cause the sender to timeout. You would have the same issue if it were an "async" sleep, e.g. tokio::time::sleep.

There are two broad strategies to avoid this:

  1. Add an unbounded queue on the stream output
  2. Make separate requests each time for the work that can be performed, e.g. first 1MB, perform work, then fetch next MB

Option 1. will potentially buffer the entire input stream if the consumer is running behind, but should ensure the request runs to completion. Option 2. is I think the better approach, and is what code should probably be doing (FWIW this is what the parquet reader does).

As an aside I also took the time to port over the custom executor approach, to show what that looks like - JanKaul/cpu-io-executor#1. I still think this is a cleaner approach although, much like the DedicatedExecutor in this PR, it doesn't help with the issue this example is running into

@JanKaul
Copy link
Contributor

JanKaul commented Jan 30, 2025

Thanks a lot for having a look! I'll try to adopt the strategy 2 to simulate the behavior of the parquet reader.

@djanderson
Copy link
Contributor

Is there extra work I'd need to do to get this to work with the ParquetSink? This isn't a full reproducer but just a quick copy of the relevant section of the do_put_statement_ingest() method in my flightsql server.

I get

flight_sql: writing data to disk
thread 'tokio-runtime-worker' panicked at /Users/dja/.rustup/toolchains/stable-aarch64-apple-darwin/lib/rustlib/src/rust/library/core/src/ops/function.rs:250:5:
A Tokio 1.x context was found, but timers are disabled. Call enable_time on the runtime builder to enable timers.

        let data_sink = ParquetSink::new(file_sink_config, table_options);

        let record_batch_stream =
            FlightRecordBatchStream::new_from_flight_data(flight_data_stream.map_err(|e| e.into()));

        let stream = Box::pin(RecordBatchStreamAdapter::new(
            schema,
            record_batch_stream.map_err(|e| DataFusionError::External(Box::new(e))),
        ));

        // This displays "IoObjectStore"
        info!(
            "{}",
            self.datafusion_state
                .runtime_env()
                .object_store(object_store_url.as_ref())
                .unwrap()
        );

        info!("writing data to disk");
        let rows_written = self
            .dedicated_rt
            .spawn(async move { data_sink.write_all(stream, &datafusion.task_ctx()).await })
            .await
            .map_err(|e| Status::data_loss(format!("error processing stream: {e}")))?
            .map_err(|e| Status::data_loss(format!("data write failed: {e}")))?;
        info!("wrote {rows_written} rows");

@alamb
Copy link
Contributor Author

alamb commented Feb 12, 2025

Is there extra work I'd need to do to get this to work with the ParquetSink? This isn't a full reproducer but just a quick copy of the relevant section of the do_put_statement_ingest() method in my flightsql server.

I get

flight_sql: writing data to disk
thread 'tokio-runtime-worker' panicked at /Users/dja/.rustup/toolchains/stable-aarch64-apple-darwin/lib/rustlib/src/rust/library/core/src/ops/function.rs:250:5:
A Tokio 1.x context was found, but timers are disabled. Call enable_time on the runtime builder to enable timers.

        let data_sink = ParquetSink::new(file_sink_config, table_options);

        let record_batch_stream =
            FlightRecordBatchStream::new_from_flight_data(flight_data_stream.map_err(|e| e.into()));

        let stream = Box::pin(RecordBatchStreamAdapter::new(
            schema,
            record_batch_stream.map_err(|e| DataFusionError::External(Box::new(e))),
        ));

        // This displays "IoObjectStore"
        info!(
            "{}",
            self.datafusion_state
                .runtime_env()
                .object_store(object_store_url.as_ref())
                .unwrap()
        );

        info!("writing data to disk");
        let rows_written = self
            .dedicated_rt
            .spawn(async move { data_sink.write_all(stream, &datafusion.task_ctx()).await })
            .await
            .map_err(|e| Status::data_loss(format!("error processing stream: {e}")))?
            .map_err(|e| Status::data_loss(format!("data write failed: {e}")))?;
        info!("wrote {rows_written} rows");

Maybe you need to run data_sink.write_all(stream, &datafusion.task_ctx()).await }) somehow on the iO thread? I am nto sure what it is doing

What is the panic error message?

@djanderson
Copy link
Contributor

djanderson commented Feb 12, 2025

What is the panic error message?

Panic error message is just

error=status: DataLoss, message: "data write failed: IO error: Error joining spawned task: task 93 panicked with message "A Tokio 1.x context was found, but timers are disabled. Call enable_time on the runtime builder to enable timers."

Maybe you need to run data_sink.write_all(stream, &datafusion.task_ctx()).await }) somehow on the iO thread? I am nto sure what it is doing

Yeah, without dedicated executor, this is where I hit executor starvation, so I can't wrap the whole call in spawn_io, I don't think... otherwise it would be the same as not having the dedicated executor at all.

I thought this would work first review because ParquetSink.write_all retrieves the object_store from the runtime_env and and uses it to initialize the async arrow writer.

Maybe there's some interaction with the file_write_tasks JoinSet that I don't understand?

https://github.com/apache/datafusion/blob/branch-44/datafusion/core/src/datasource/file_format/parquet.rs#L865

djanderson added a commit to djanderson/parquet-sink-dedicated-exec-repro that referenced this pull request Feb 16, 2025
This runs the test exactly as I wrote it in

apache/datafusion#14286 (comment)

It passes, therefore it fails to demonstrate the failure I'm trying to
reproduce. This is just checked in as the initial commit to capture the
starting point.
@djanderson
Copy link
Contributor

djanderson commented Feb 16, 2025

Alright so I was able to strip down a reproducer for the JoinError panic.

  • I was unable to trigger the issue with the MockStore, but can trigger it running against a localstack S3 container
  • I was unable to trigger the issue with one or a small number of small record batches. The issue only crops up with longer streams of data.

I imagine it could be slightly simpler but at least for me this is the easiest thing I could come up with that consistently triggers the issue.

https://github.com/djanderson/parquet-sink-dedicated-exec-repro

Would @alamb or anyone else potentially be able to see if this reproduces the error for you? I haven't given up on running the issue down but because of the nature of what does and doesn't trigger the issue, help from more knowledgeable folks would be hugely appreciated.

This is what a small number of record batches looks like, which is a failure to reproduce the issue:
Important things to note in this case are:

  • the file is successfully written to minio
  • the client gets a successful response
image

This is what a larger number of record batches looks like, which demonstrates the tokio panic on the server. Important things to note in this case are:

  • the file fails to be written to minio
  • the client gets an error response
image

@djanderson
Copy link
Contributor

djanderson commented Feb 16, 2025

Incidentally, by disabling the dedicated executor in this reproducer repo, it seems to also (finally) demonstrates the problem we're looking to solve in the first place: I can consistently get it to cause a client timeout and stream RST with full data loss, even with both server and client running in release mode.

The benefit to this over other attempts at reproducing the problem (including my own previous attempts) is that this is a very simple flight server and client, straight from the examples... the type of thing a new user of the ecosystem might reasonably write to test out the "FDAP" stack. There are no artificial timeouts and the only slight cheat may be that I am pushing fully random data over the pipe just to ensure that most of it doesn't get compressed away so that there's some significant disk IO.

/// UNLIKE [`tokio::task::spawn`], the returned future is **cancelled** when
/// it is dropped. Thus, you need ensure the returned future lives until it
/// completes (call `await`) or you wish to cancel it.
pub fn spawn<T>(&self, task: T) -> impl Future<Output = Result<T::Output, JobError>>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pub fn spawn<T>(&self, task: T) -> impl Future<Output = Result<T::Output, JobError>>
pub fn spawn<T>(&self, task: T) -> impl Future<Output = Result<T::Output, JobError>> + use<T>

Use new RPIT lifetime precise capturing for compatibility with newly released rust 2024 edition. Otherwise this example fails to compile.

Comment on lines +850 to +853
self.inner
.as_mut()
.expect("paths that take put inner back")
.put_part(data)
Copy link
Contributor

@djanderson djanderson Feb 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrap IoMultipartUpload::put_part() in spawn_io_static, otherwise this will panic.

Suggested change
self.inner
.as_mut()
.expect("paths that take put inner back")
.put_part(data)
DedicatedExecutor::spawn_io_static(
self.inner
.as_mut()
.expect("paths that take put inner back")
.put_part(data),
)
.boxed()

@djanderson
Copy link
Contributor

Hey all, sorry for the noise on this thread lately, I've been working to try and understand how to apply this example to an actual DataFusion-based server, and not having much luck.

I now have the dedicated executor from this PR fully integrated and working in my executor starvation reproducer, however, I'm not observing a significant improvement. I see others mentioning that a similar approach has worked okay for them in the past, which makes me question what I'm seeing. I could really use another set of eyes on it.

I'm intentionally exercising the ingest path. It seems to more consistently exhibit the starvation issues, and the result of hitting the issue on ingest is potential data loss.... definitely something a database should avoid 😬

Does it show what I think it shows, or might it be some other phenomenon? If it shows 1) consistent executor starvation and 2) that this PR doesn't seem to improve it and 3) that we therefore have no working example as a community of how to ingest data into a server-deployed datafusion-based database, would we consider that a significant issue? Maybe even worth calling out as a priority on the roadmap?

As always, willing to keep testing with the threadpool approach or other suggestions. LMK.

@alamb
Copy link
Contributor Author

alamb commented Feb 26, 2025

Hey all, sorry for the noise on this thread lately, I've been working to try and understand how to apply this example to an actual DataFusion-based server, and not having much luck.

I now have the dedicated executor from this PR fully integrated and working in my executor starvation reproducer, however, I'm not observing a significant improvement. I see others mentioning that a similar approach has worked okay for them in the past, which makes me question what I'm seeing. I could really use another set of eyes on it.

I'm intentionally exercising the ingest path. It seems to more consistently exhibit the starvation issues, and the result of hitting the issue on ingest is potential data loss.... definitely something a database should avoid 😬

Does it show what I think it shows, or might it be some other phenomenon? If it shows 1) consistent executor starvation and 2) that this PR doesn't seem to improve it and 3) that we therefore have no working example as a community of how to ingest data into a server-deployed datafusion-based database, would we consider that a significant issue? Maybe even worth calling out as a priority on the roadmap?

As always, willing to keep testing with the threadpool approach or other suggestions. LMK.

Thank you @djanderson -- I hadn't seen https://github.com/djanderson/parquet-sink-dedicated-exec-repro

I hope to get back to this issue in the next week or two and try and come up with the next steps based on all the feedback so far.

@alamb
Copy link
Contributor Author

alamb commented Mar 7, 2025

Hey all, sorry for the noise on this thread lately, I've been working to try and understand how to apply this example to an actual DataFusion-based server, and not having much luck.

@djanderson -- I played around with your example / repro and I think you may be hitting a gRPC / flight client timeout (not related to what the server is doing). I wrote up my findings here

@alamb
Copy link
Contributor Author

alamb commented Mar 7, 2025

I have also made a reproducer showing how timeouts happen with slower internet connections / certain files (because DataFusion is making single large requests which can't be completed within a timeout):

@alamb
Copy link
Contributor Author

alamb commented Mar 7, 2025

I have one more theory I want to chase down this afternoon and then I will write up my thoughts on next steps here

@alamb
Copy link
Contributor Author

alamb commented Mar 7, 2025

I have one more theory I want to chase down this afternoon and then I will write up my thoughts on next steps here

My other theory was that DataFusion might start requests but not consume them in time, thus leading to timeout errors even though the response was ready. However, I was not able to find any evidence that this was actually happening

@adriangb
Copy link
Contributor

adriangb commented Mar 7, 2025

My other theory was that DataFusion might start requests but not consume them in time, thus leading to timeout errors even though the response was ready. However, I was not able to find any evidence that this was actually happening.

I'd expect precisely something like this if CPU is blocking IO tasks because the IO struggles to make progress and eventually times out. Couple that with retries and you can end up in scenarios where it's hard to tell what's causing what.

@alamb
Copy link
Contributor Author

alamb commented Mar 7, 2025

My other theory was that DataFusion might start requests but not consume them in time, thus leading to timeout errors even though the response was ready. However, I was not able to find any evidence that this was actually happening.

I'd expect precisely something like this if CPU is blocking IO tasks because the IO struggles to make progress and eventually times out. Couple that with retries and you can end up in scenarios where it's hard to tell what's causing what.

One thing that I noticed is that DataFusion uses ObjectStore::get_ranges which returns buffered Bytes -- so from the network perspective I think the data is being consumed 🤔

@alamb
Copy link
Contributor Author

alamb commented Mar 7, 2025

After reading all the relevant feedback on this PR here is my summary:

  1. Some people have found it valuable to have an ObjectStore which does I/O on a different tokio Runtime
  2. The DedicatedExecutor API seems overly complicated for what it is doing.

Thus my suggested next steps are:

  1. For the near term / workaround, break out the object store wrapper to pass work on different runtimes . Perhaps based on the one in delta-rs
  2. Update this example to that wrapper and use a tokio::Runtime directly rather than a DedicatedExecutor
  3. Work on a more general purpose IO API (see discuss: Introduce datafusion-storage as datafusion's own storage interface #14854 from @Xuanwo) which might make it easier to get this right without the shim / spawn io

@ion-elgreco
Copy link

After reading all the relevant feedback on this PR here is my summary:

  1. Some people have found it valuable to have an ObjectStore which does I/O on a different tokio Runtime
  2. The DedicatedExecutor API seems overly complicated for what it is doing.

Thus my suggested next steps are:

  1. For the near term / workaround, break out the object store wrapper to pass work on different runtimes . Perhaps based on the one in delta-rs
  2. Update this example to that wrapper and use a tokio::Runtime directly rather than a DedicatedExecutor
  3. Work on a more general purpose IO API (see discuss: Introduce datafusion-storage as datafusion's own storage interface #14854 from @Xuanwo) which might make it easier to get this right without the shim / spawn io

Sounds like a prime candidate for that object-store-utils crate :)

@alamb
Copy link
Contributor Author

alamb commented Mar 7, 2025

Sounds like a prime candidate for that object-store-utils crate :)

Yeah for sure. Now we just need to figure out where to put that (in apache or not 🤔 )

@ion-elgreco
Copy link

Sounds like a prime candidate for that object-store-utils crate :)

Yeah for sure. Now we just need to figure out where to put that (in apache or not 🤔 )

I think it can be beneficial in Apache but I don't know these processes tbh.

It would also be great to add caching object store implementations in there as well, such as this one https://github.com/slatedb/slatedb/blob/main/src%2Fcached_object_store%2Fobject_store.rs (or one of the other many out there :p)

@alamb
Copy link
Contributor Author

alamb commented Mar 8, 2025

Sounds like a prime candidate for that object-store-utils crate :)

Yeah for sure. Now we just need to figure out where to put that (in apache or not 🤔 )

I think it can be beneficial in Apache but I don't know these processes tbh.

It would also be great to add caching object store implementations in there as well, such as this one https://github.com/slatedb/slatedb/blob/main/src%2Fcached_object_store%2Fobject_store.rs (or one of the other many out there :p)

@tustvold
Copy link
Contributor

tustvold commented Mar 8, 2025

FWIW the new HttpClient abstraction, introduced in ObjectStore 0.12, provides a potentially nicer way to spawn IO on to a separate runtime - apache/arrow-rs#7253

@alamb
Copy link
Contributor Author

alamb commented Mar 13, 2025

After thinking more about this more, what i am currently thinking is:

  1. Wait until [ObjectStore] Add SpawnService for running requests on different tokio runtime/Handle arrow-rs#7253 is available in DataFusion (likely April)
  2. Update this example to use that and another runtime directly (rather than dedicated executor, etc)

In parallel I think discussions on how to get some more optimized implemenations for object stores can proceed:

@alamb alamb marked this pull request as draft March 18, 2025 14:48
@alamb
Copy link
Contributor Author

alamb commented Mar 18, 2025

Converting to a draft until we hav spawn service

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Document DataFusion Threading / tokio runtimes (how to separate IO and CPU bound work)
9 participants