Skip to content

[BUG] Cloudwatch Sink high memory usage, does not backpressure #6141

@JonahCalvo

Description

@JonahCalvo

Describe the bug
When CW sink cannot keep up with incoming data, threads pile up in an unbounded thread queue.

In this PR #5770 :

Modify cloudwatch sink to use FixedThreadPool. The existing newCachedThreadPool() creates huge amount of threads (leading to out of memory error in some cases) when there is large amount of events created.

But from what I've found, FixedThreadPool still has an unbounded queue:
https://stackoverflow.com/questions/53913601/what-is-the-use-case-for-unbounded-queue-in-java-executors
When CloudWatch API is slow, queue keeps accepting new batches without limit, leading to OOM circuit breaker flips

It creates the threads at the beginning in the thread pool. But, it then queues up events in the Data Prepper process worker threads. And it has a thread that reads from that.
So it is creating a new buffer. And it breaks our ability to put backpressure on the overall pipeline.

we want to consider dropping these threads. But, that might be a significant change.
A possible short-term solution would be to add a blocking queue here:
https://github.com/opensearch-project/data-prepper/blob/087ced7fc54fb37b9f3b4dcec9[…]ugins/sink/cloudwatch_logs/client/CloudWatchLogsDispatcher.java
Maybe limit it to the number of process workers.

To Reproduce
Steps to reproduce the behavior:

  1. Create a Cloudwatch sink pipeline with small batch size
  2. Ingest a large amount of data
  3. Observe low buffer usage, yet circuit breaker flips as sink uses the max memory

Expected behavior
Sink should apply backpressure

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    Status

    Unplanned

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions