-
Notifications
You must be signed in to change notification settings - Fork 326
Server, Redcon, Mertics, Main #4459
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: streams-adapters
Are you sure you want to change the base?
Conversation
|
| maxWaitTime := 60 * time.Second | ||
| checkInterval := 500 * time.Millisecond |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: These sounds like good candidates for config values.
This is true for most numbers that appear in the PR - web server configs and so on. This doesn't mean we NEED them in a conf or that moving them to a conf needs to happen in this PR.
| // Check if we've exceeded the maximum wait time | ||
| if time.Since(startTime) > maxWaitTime { | ||
| logger.Error("EA server did not become ready within timeout", "timeout", maxWaitTime) | ||
| log.Fatal("EA server startup timeout") | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This works as is and needs no change.
That said, the more idiomatic way to do the same thing would be to define a timer outside the loop:
timer := time.After(maxWaitTime)
and then replace the block above with this:
select {
case <-timer:
logger.Error("EA server did not become ready within timeout", "timeout", maxWaitTime)
log.Fatal("EA server startup timeout")
default:
}
The reason we prefer this is that is gives us a very clean way to add multiple blocking conditions in there (channels, state, etc.).
As I said above, the current code needs no change.
| if err == nil && resp.StatusCode == http.StatusOK { | ||
| resp.Body.Close() | ||
| logger.Info("EA server is ready", "elapsed", time.Since(startTime)) | ||
| return | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We wait for the EA to be able to respond successfully to a /health call.
Do we need to inspect the body of the response? Is there a case where the EA might respond successfully but still be unhealthy?
| }) | ||
|
|
||
| // Create error channel for goroutine failures | ||
| errChan := make(chan error, 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see where we read from this channel. I see that we write to it from the goroutine but I don't see a read.
| if err := redconServer.Start(); err != nil { | ||
| log.Fatal(err) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I might need some help understanding how this ties to the web server, too. You mentioned this already. :)
| httpClient *http.Client | ||
| subscriptionTracker sync.Map | ||
| metrics *appMetrics.Metrics | ||
| ctx context.Context // Add this |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the comment is probably outdated now.
| defer errorResponsePool.Put(errorResp) | ||
|
|
||
| errorResp.Error.Name = "AdapterError" | ||
| errorResp.Error.Message = "Unable to subsribe to an asset pair with the the requested data" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| errorResp.Error.Message = "Unable to subsribe to an asset pair with the the requested data" | |
| errorResp.Error.Message = "Unable to subscribe to an asset pair with the the requested data" |
| if _, alreadySubscribing := s.subscriptionTracker.LoadOrStore(subscriptionKey, true); !alreadySubscribing { | ||
| s.logger.Debug("Initiating new subscription", "requestParams", canonicalParams, "subscriptionKey", subscriptionKey) | ||
| go func(key string, params types.RequestParams) { | ||
| s.subscribeToAsset(params) | ||
| // Remove from tracker after subscription attempt completes | ||
| // Allow retries after 10 seconds if data still not available | ||
| time.Sleep(10 * time.Second) | ||
| s.subscriptionTracker.Delete(key) | ||
| }(subscriptionKey, canonicalParams) | ||
| } else if s.config.LogLevel == "debug" { | ||
| s.logger.Debug("Subscription already in progress, skipping", "key", subscriptionKey) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will need some guidance here. :)
| for key, value := range params { | ||
| dataMap[key] = value | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason to copy the data from params instead of using it directly?
| cache "streams-adapter/cache" | ||
| config "streams-adapter/config" | ||
| helpers "streams-adapter/helpers" | ||
| redcon "streams-adapter/redcon" | ||
| server "streams-adapter/server" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: These aliases are redundant.
Closes DS-1421, DS-1423
Description
Implements the following functionality:
http_requests_total