Skip to content

[Enhancement] Optimize OffsetStorageWriterImpl#beginFlush method handle logic #4795

Closed
@mxsm

Description

@mxsm

Search before asking

  • I had searched in the issues and found no similar issues.

Enhancement Request

public synchronized boolean beginFlush() {
if (isFlushing()) {
throw new RuntimeException("OffsetStorageWriter is already flushing");
}
if (data.isEmpty()) {
return false;
}
this.toFlush = this.data;
this.data = new HashMap<>();
return true;
}

When flushing is in progress, a RuntimeException will be thrown if SourceWorker#commitOffsets method calls beginFlush. This error will propagate upwards causing the Source to exit.

Describe the solution you'd like

image
replace this exception with log print and return false.

I hope you understand how offsets are managed. It's not just a simple replacement.

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions