Skip to content

new feature (or bug?): concurrent limit layer and concurrent read could lead to deadlock #7338

@dentiny

Description

@dentiny

Feature Description

Correct me if wrong, this is my understand on how concurrent read works:

  • When users do read_with().chunk().concurrent(C), the concurrent tasks will create C slots and try to issue as many HTTP requests as possible.
  • If concurrent limit layer applied, HTTP requests need to acquire a permit first, and the acquired permit keeps valid until the stream gets dropped.
  • But the issue is, concurrent tasks hold the ownership of the opened streams, and only try to sync when the operation-level concurrent limit reaches (the C above)

In a detailed example,

  • Suppose concurrent limit is set to 2, and we issue 4 read operations, which will be turned into 16 HTTP requests
  • Assume the HTTP request is sent in the order of
    • read operation 1, HTTP request 1 -> acquires permit and holds
    • read operation 2, HTTP request 1 -> acquires permit and holds
    • Later all requests get blocked at permit acquisition

I found this issue annoying because it doesn't always reproduce in production.

Problem and Solution

To workaround, I implement my own concurrent limit layer. Two main goals:

  • No non-deterministic deadlock
  • Have a process-wise concurrent limit for HTTP requests, which serves as a workaround for missing TCP connection pool

Before every read operation, I attempted as CAS operation to acquire a batch of permits, which get released when the read operation finishes.

It works but adds a code complexity at application layer.

I'm wondering if it's possible to connect (1) concurrent tasks and (2) concurrent limit layer (or self-implemented layers) better, so it's aware of the current actual limit?
For example, pass a functor to chunk reader?

Additional Context

Repro script

#[tokio::test]
async fn http_semaphore_deadlocks_with_concurrent_chunk_read() {
    // A mock HTTP fetcher that returns 1 KiB of data for every request.
    struct DataFetcher;
    impl HttpFetch for DataFetcher {
        async fn fetch(&self, _req: http::Request<Buffer>) -> Result<Response<HttpBody>> {
            let data = Buffer::from(vec![0u8; 1024]);
            Ok(Response::builder()
                .status(200)
                .body(HttpBody::new(stream::iter([Ok(data)]), Some(1024)))
                .unwrap())
        }
    }

    #[derive(Debug)]
    struct MockHttpService {
        info: Arc<AccessorInfo>,
    }

    impl Access for MockHttpService {
        type Reader = oio::Reader;
        type Writer = oio::Writer;
        type Lister = oio::Lister;
        type Deleter = oio::Deleter;

        fn info(&self) -> Arc<AccessorInfo> {
            self.info.clone()
        }

        async fn stat(&self, _path: &str, _args: OpStat) -> Result<RpStat> {
            // 4 KiB file → 4 chunks of 1 KiB each.
            Ok(RpStat::new(
                Metadata::new(EntryMode::FILE).with_content_length(4096),
            ))
        }

        async fn read(&self, _path: &str, _args: OpRead) -> Result<(RpRead, Self::Reader)> {
            let req = http::Request::builder()
                .uri("http://fake/data")
                .body(Buffer::new())
                .unwrap();
            let resp = self.info.http_client().fetch(req).await?;
            Ok((RpRead::default(), Box::new(resp.into_body())))
        }
    }

    // Wire up the mock service with DataFetcher as the HTTP backend.
    let info = Arc::new(AccessorInfo::default());
    info.set_scheme("mock_http").set_native_capability(Capability {
        stat: true,
        read: true,
        ..Default::default()
    });
    info.update_http_client(|_| HttpClient::with(DataFetcher));

    let svc: Arc<dyn AccessDyn> = Arc::new(MockHttpService { info });

    // Apply the concurrent-limit layer:
    //   • operation semaphore = 1024  (effectively unlimited)
    //   • HTTP semaphore      = 2     (tight limit, less than concurrent=4)
    let op = Operator::from_inner(svc)
        .layer(ConcurrentLimitLayer::new(1024).with_http_concurrent_limit(2));

    // read_with().concurrent(4).chunk(1024) triggers ChunkedReader which
    // tries to keep 4 readers in flight.  With only 2 HTTP permits the
    // 3rd next_reader() call blocks forever → deadlock.
    let result = timeout(
        Duration::from_secs(2),
        op.read_with("test_file").concurrent(4).chunk(1024),
    )
    .await;

    assert!(
        result.is_ok(),
        "read_with().concurrent(4) deadlocked: HTTP semaphore permits exhausted \
            by unconsumed readers inside ConcurrentTasks (see issue #7338)"
    );
}

Are you willing to contribute to the development of this feature?

  • Yes, I am willing to contribute to the development of this feature.

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions