Skip to content

HTTP-182 HTTP sink retries#181

Open
Jonathan Lehto (jonathanlehto) wants to merge 12 commits into
getindata:mainfrom
jonathanlehto:http-sink-retries
Open

HTTP-182 HTTP sink retries#181
Jonathan Lehto (jonathanlehto) wants to merge 12 commits into
getindata:mainfrom
jonathanlehto:http-sink-retries

Conversation

@jonathanlehto
Copy link
Copy Markdown

@jonathanlehto Jonathan Lehto (jonathanlehto) commented Oct 27, 2025

Description

Attempting to re-vitalize the http sink updates. Branch ended up getting bigger than I hoped, but please provide any feedback you have!

Resolves <issue nr here>

PR Checklist

"failed requests: {}, throwing BatchHttpStatusCodeValidationFailedException from sink",
failedRequests
);
getFatalExceptionCons().accept(new BatchHttpStatusCodeValidationFailedException(
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For context, the README reads like an error should trigger a job failure, which is why I threw a fatal exception here

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

after the introduction of ignore codes an the like this is true. I am not sure if this is the case for the old allow and deny scenario

Copy link
Copy Markdown
Contributor

@davidradl David Radley (davidradl) left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest that you raise an issue for each of the 3 things you would like to put in. It will be far easier to review smaller pieces.

  • the retries for sink should be very self contained piece of code.
  • for the ratelimitter -I see we have apache/flink#27134 which is not yet merged. Is there a generic way we can do Sink in the Flink code base ?
  • fyi - we are porting over changes to https://github.com/apache/flink-connector-http and are hoping to release this in the near future

Comment thread README.md Outdated
You can configure HTTP status code handling for HTTP sink table and enable automatic retries with delivery guarantees.

#### Retries and delivery guarantee
HTTP Sink supports automatic retries when `sink.delivery-guarantee` is set to `at-least-once`. Failed requests will be automatically retried based on the configured status codes.
Copy link
Copy Markdown
Contributor

@davidradl David Radley (davidradl) Oct 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sink.delivery-guarantee is interesting and looks like it is a bringing this connector in line with others. When do we think we will get more that one request issued? I assume we would need more processing for exactly once.

Copy link
Copy Markdown
Author

@jonathanlehto Jonathan Lehto (jonathanlehto) Oct 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am lifting a big part of the code from an earlier PR, which was the source of using a delivery guarantee. However, for my personal use case, I do in fact need to guarantee at least once behavior 🙂. The current implementation has an indefinite number of retries. If we don't have the delivery guarantee, we'll need to have something like number of attempts imo. I'm fine with whatever though! I just really need retries!

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly once is pretty challenging given that we are using HTTP requests. Imo, it would require coordination with the receiving service to response with some kind of ack in order to enforce it. I don't believe exactly once can be achieved on just the client side with HTTP requests if that makes sense. However, duplicated success message emissions should be really infrequent

@jonathanlehto
Copy link
Copy Markdown
Author

Jonathan Lehto (jonathanlehto) commented Oct 30, 2025

I would suggest that you raise an issue for each of the 3 things you would like to put in. It will be far easier to review smaller pieces.

That's great to hear this could make it in the Apache foundation! I can certainly break up the retry behavior and the rate limiter. However, I'm not sure if I could separate the delivery guarantee and the HTTP code refactor. I don't the current sink configuration makes much sense, and I would like to consolidate the code parsing logic between the sink and polling clients. Let me know if you have thoughts there.. I can certainly try to break the httpd code changes and delivery guarantee up if you still think it would be helpful!

You probably meant the config changes, the rate limiting, and the retry behavior 🤦 . I can do that

Comment thread README.md Outdated
Comment thread README.md Outdated
Comment thread README.md Outdated
Comment thread README.md Outdated
Comment thread src/main/java/com/getindata/connectors/http/internal/SinkHttpClientResponse.java Outdated
@Slf4j
public class HttpSinkWriter<InputT> extends AsyncSinkWriter<InputT, HttpSinkRequestEntry> {

private static final int AIMD_RATE_LIMITING_STRATEGY_INCREASE_RATE = 10;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think it is worth making these parameters configurable?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but given #181 (review) concerns. I would like to just remove the retry strategy from the pull request all-together. A retry policy is almost always present if retries are a thing, however, I can understand David Radley (@davidradl)'s concerns and I'm hesitant to block retries due to retry policy implementation if that makes sense. Please let me know your thoughts!

@grzegorz8 Grzegorz Kołakowski (grzegorz8) linked an issue Nov 1, 2025 that may be closed by this pull request
@grzegorz8 Grzegorz Kołakowski (grzegorz8) changed the title Http sink retries HTTP-182 HTTP sink retries Nov 1, 2025
Comment thread CHANGELOG.md
Comment thread README.md
@davidradl
Copy link
Copy Markdown
Contributor

David Radley (davidradl) commented Dec 4, 2025

fyi #200

@davidradl
Copy link
Copy Markdown
Contributor

Jonathan Lehto (@jonathanlehto) Sorry for the long delay; this and the other 2 PRs are capabilities that we (IBM) would like to get merged. I think the current state of the code is good enough to merge, but to be extra diligent I would like to have this ported to the Apache HTTP connector and have Ferenc review it. Are you ok to bring this code up to date here and port to the Apache HTTP connector - under 3 Apache Flink Jiras. Once it is reviewed in the apache repo we can implement any feedback and merge in both places. If you port the fix these will be recognised Apache Flink contributions. If you are no long wanting to work on this let me know and I will take over when it gets to the top of my priority list.

fyi Grzegorz Kołakowski (@grzegorz8)

@jonathanlehto
Copy link
Copy Markdown
Author

Jonathan Lehto (@jonathanlehto) Sorry for the long delay; this and the other 2 PRs are capabilities that we (IBM) would like to get merged. I think the current state of the code is good enough to merge, but to be extra diligent I would like to have this ported to the Apache HTTP connector and have Ferenc review it. Are you ok to bring this code up to date here and port to the Apache HTTP connector - under 3 Apache Flink Jiras. Once it is reviewed in the apache repo we can implement any feedback and merge in both places. If you port the fix these will be recognised Apache Flink contributions. If you are no long wanting to work on this let me know and I will take over when it gets to the top of my priority list.

fyi Grzegorz Kołakowski (@grzegorz8)

Yes, I can do that, thanks for updating me! I should be able to get it done by the end of the day on Friday. If I am unable to do so by then, likely something has come up.

Add tests

Add RateLimitingStrategy

Add Http Sink capability, DeliveryMode support, introduce http lookup compatible constants, deprecate old constants, consolidate HTTP Code evaluation logic

addressed code review comments, README updates, SinkHttpClientResponse refactor, and switched default to At-Least-Once

update changelog and readme

update white/black terminology to approve/deny
@davidradl
Copy link
Copy Markdown
Contributor

Jonathan Lehto (@jonathanlehto) thanks for makeing these changes- can I check what order should I review and look to merge the 3 PRs - should I look at this one first as it is oldest?

@jonathanlehto
Copy link
Copy Markdown
Author

Jonathan Lehto (@jonathanlehto) thanks for makeing these changes- can I check what order should I review and look to merge the 3 PRs - should I look at this one first as it is oldest?

This merge request is all three components combined; however, rate limiting may need to be updated from your earlier comment. The http182-2 branch has a minimal set of changes for just retries, and the http183 branch is just the configuration consolidation between the sink and lookup tables.

The two other branches will conflict once merged, iirc, but would be easier to review them as opposed to this one. I could also create a rate limitting branch, now that it's been done on the source side as well

@davidradl
Copy link
Copy Markdown
Contributor

Jonathan Lehto (@jonathanlehto) thanks for makeing these changes- can I check what order should I review and look to merge the 3 PRs - should I look at this one first as it is oldest?

This merge request is all three components combined; however, rate limiting may need to be updated from your earlier comment. The http182-2 branch has a minimal set of changes for just retries, and the http183 branch is just the configuration consolidation between the sink and lookup tables.

The two other branches will conflict once merged, iirc, but would be easier to review them as opposed to this one. I could also create a rate limitting branch, now that it's been done on the source side as well

Jonathan Lehto (@jonathanlehto) Ok thanks for your quick response - I will review this as the whole change. I suggest we remove the rate limiting part and deal with that separately. We can then get the uncontentious part in quickly.

List<HttpSinkRequestEntry> requestEntries = Arrays.asList(request1, request2);
this.httpSinkWriter.submitRequestEntries(requestEntries, requestResult);

// would be good to use Countdown Latch instead sleep...
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed - I know the Flink community are not keen on sleeps. Could we remove all the sleeps from the unit tests please.

"500,503,504"
);
String successCodeExpr = properties.getProperty(
HttpConnectorConfigConstants.SINK_SUCCESS_CODES,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was expecting the success ignore lists of codes to be in a similar file to the source ones.
The source has HttpLookupConnectorOptions which uses the flink config to define the defaults. I was expecting similar content in HttpDynamicSinkConnectorOptions.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, I can't recall why I didn't do that originally...

);
String successCodeExpr = properties.getProperty(
HttpConnectorConfigConstants.SINK_SUCCESS_CODES,
"1XX,2XX,3XX"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these are different defaults to the source - I suggest they should be the same for consistency unless. these is a reason to add 1xx and 3xx to sink specifically.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was to preserve backwards compatibility. The old system had an inverted behavior, in which inverting failure codes to success codes results in this configuration. I can update it match the source defaults


private static HttpResponseChecker createHttpResponseCheckerWithDefaults(Properties properties)
throws ConfigurationException {
String ignoreCodeExpr = properties.getProperty(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we refactor the code so the processing of the source an sink ignore faiure and success responses are done in the same method , with either the source or sink values from config being passed in?

Copy link
Copy Markdown
Contributor

@davidradl David Radley (davidradl) left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have made some comments:

  • I suggest removing the rate limiting part and having that separate.
  • I am suggesting using the same code for retries in sink and source if possible.
  • we should check that the old and the new config options are not both present
  • Add migration guide for users upgrading from previous versions
  • Document performance implications of AT_LEAST_ONCE vs NONE
  • source retries ioexceptions but sink does not.

@jonathanlehto
Copy link
Copy Markdown
Author

David Radley (@davidradl) , Thank you for the comments! I should be able to address them all tomorrow!

@jonathanlehto
Copy link
Copy Markdown
Author

David Radley (@davidradl) I went ahead and made the changes. Please take a look! I put the different comments in their own commit. Also, I did what I could to address "suggesting using the same code for retries in sink and source if possible", but I feel like the two are fundamentally incompatible due to the use of delivery guarantees. I'm not sure we can have an AT_LEAST_ONCE policy with a finite number of retries if that makes sense! Maybe we could throw if retries get exhausted in the sink, but I would like to hear your thoughts!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Http Sink Retries

3 participants