[receiver/azure_functions] Initial implementation to receive logs from Azure Functions triggered by Event Hub#47050
Conversation
…rt event_hub trigger, only logs signal Signed-off-by: Tetiana Kravchenko <tetiana.kravchenko@elastic.co>
Signed-off-by: Tetiana Kravchenko <tetiana.kravchenko@elastic.co>
Signed-off-by: Tetiana Kravchenko <tetiana.kravchenko@elastic.co>
Signed-off-by: Tetiana Kravchenko <tetiana.kravchenko@elastic.co>
Signed-off-by: Tetiana Kravchenko <tetiana.kravchenko@elastic.co>
Signed-off-by: Tetiana Kravchenko <tetiana.kravchenko@elastic.co>
Signed-off-by: Tetiana Kravchenko <tetiana.kravchenko@elastic.co>
|
|
||
| mux := http.NewServeMux() | ||
| // TODO: Refactor so Start() collects path+profile specs from each trigger (logs and, later, metrics) and registers them in one loop; adding a trigger or signal should not duplicate registration logic here. | ||
| if r.cfg.EventHub != nil && len(r.cfg.EventHub.Logs) > 0 { |
There was a problem hiding this comment.
Should the server start regardless of this configuration? If EventHub is nil or there are no EventHub.Logs configured there will be no route configured
There was a problem hiding this comment.
the server will start if correct http_config configuration is provided, but yes: if EventHub is nil or there are no EventHub.Logs configured, there will be no invoke routes
routes and corresponding profiles are created from the config (event_hub.logs only, for now), there are no default triggers
There was a problem hiding this comment.
Should we flag this early in the config Validate()? There's no point in moving forward with the startup sequence if we have zero bindings.
Validate() should require at least one trigger.
There was a problem hiding this comment.
| p.protocol.Failure(w, fmt.Errorf("read body: %w", err), nil) | ||
| return | ||
| } | ||
| defer r.Body.Close() |
There was a problem hiding this comment.
This should be defered before body, err := io.ReadAll(r.Body) to catch an early error in io.ReadAll
| res.Attributes().PutStr(k, v) | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Is import cycles the only reason for a new package here? It's a bit confusing naming it handler, when there is also a handler.go file.
In my view this just includes types. I don't know if there are future plans for this package to include more functionality.
There was a problem hiding this comment.
Yes, it was the main driver, also to place shared behavior used by consumers (not much for now).
Agree the overlap with handler.go is confusing, we can rename to something like internal/invoke, wdyt?
In my view this just includes types. I don't know if there are future plans for this package to include more functionality.
I was trying to keep implementation generic and easy to extend to support trigger types other than the Event Hub as discussed with @zmoog
There was a problem hiding this comment.
Yes, it was the main driver, also to place shared behavior used by consumers (not much for now).
Why not internal/common ?
@zmoog do you agree? I am not blocking on this
There was a problem hiding this comment.
renamed to common to avoid naming confusion
There was a problem hiding this comment.
internal/common fails with the error: avoid meaningless package names, renamed to trigger as it contains common types for all triggers
MichaelKatsoulis
left a comment
There was a problem hiding this comment.
I just left some nits
Signed-off-by: Tetiana Kravchenko <tetiana.kravchenko@elastic.co>
Signed-off-by: Tetiana Kravchenko <tetiana.kravchenko@elastic.co>
… error: avoid meaningless package names Signed-off-by: Tetiana Kravchenko <tetiana.kravchenko@elastic.co>
|
@jmacd could you please have a look at this PR? |
zmoog
left a comment
There was a problem hiding this comment.
@tetianakravchenko, nice work! I've noted a few minor things we could refine, but I'm happy to ship it in its current state, if needed.
|
|
||
| mux := http.NewServeMux() | ||
| // TODO: Refactor so Start() collects path+profile specs from each trigger (logs and, later, metrics) and registers them in one loop; adding a trigger or signal should not duplicate registration logic here. | ||
| if r.cfg.EventHub != nil && len(r.cfg.EventHub.Logs) > 0 { |
There was a problem hiding this comment.
Should we flag this early in the config Validate()? There's no point in moving forward with the startup sequence if we have zero bindings.
Validate() should require at least one trigger.
| res.Attributes().PutStr(k, v) | ||
| } | ||
| } | ||
| } |
| if merged.LogRecordCount() == 0 { | ||
| return errors.New("no logs to consume") | ||
| } |
There was a problem hiding this comment.
| if merged.LogRecordCount() == 0 { | |
| return errors.New("no logs to consume") | |
| } | |
| if merged.LogRecordCount() == 0 { | |
| // Decision: Log events that result in zero records are treated | |
| // as anomalies and rejected as permanent errors. | |
| return errors.New("no logs to consume") | |
| } |
There was a problem hiding this comment.
Since this logic is a bit subtle, I'm calling it out explicitly so that reviewers and code-owners can push back if this doesn't align with our standards.
| w.Header().Set("Content-Type", "application/json") | ||
| if _, err := w.Write(data); err != nil { | ||
| http.Error(w, fmt.Sprintf("write response: %v", err), http.StatusInternalServerError) | ||
| } |
There was a problem hiding this comment.
| w.Header().Set("Content-Type", "application/json") | |
| if _, err := w.Write(data); err != nil { | |
| http.Error(w, fmt.Sprintf("write response: %v", err), http.StatusInternalServerError) | |
| } | |
| w.Header().Set("Content-Type", "application/json") | |
| _, _ = w.Write(data) // headers are flushed on Write; nothing useful we can do on error | |
| } |
| // profile binds a method name (binding/path) to a Protocol and Consumer. | ||
| // The generic HTTP handler uses it to: parse request with protocol, then consume with consumer. | ||
| type profile struct { | ||
| method string |
There was a problem hiding this comment.
I think we should consider calling this binding instead of method.
My current understanding is the Function Host sets the Data.<name> in the HTTP request using the binding name from the <function-name/function.json:
{
"bindings": [
{
"type": "eventHubTrigger",
"name": "logs", // <———— 👀
"direction": "in",
"eventHubName": "logs",
"connection": "EventHubConnectionString",
"cardinality": "many",
"consumerGroup": "ecf",
"dataType": "binary"
}
]
}| azure_functions: | ||
| # HTTP server configuration | ||
| http: | ||
| http_config: |
There was a problem hiding this comment.
Since this PR contains the very first azure functions receiver working implementation, I don't think we need to handle a formal breaking change.
IMO no actions needed here.
|
The Azure Functions host is the only client here, but the request size depends on the trigger — for Event Hub it's (max message size × batch count × base64 overhead). Consider adding a body, err := io.ReadAll(io.LimitReader(r.Body, maxRequestBodySize))100 MB covers the worst realistic Event Hub scenario (20 MB premium messages × Future triggers should evaluate their own payload characteristics and document whether And the config change would look like: type Config struct {
HTTP *confighttp.ServerConfig `mapstructure:"http_config"`
Auth component.ID `mapstructure:"auth"`
EventHub *EventHubTriggerConfig `mapstructure:"event_hub"`
// MaxRequestBodySize is the maximum allowed size of an incoming invoke request body.
// Defaults to 100 MB, which covers premium Event Hub tiers (20 MB messages)
// with large batch sizes and base64 encoding overhead.
// Future trigger types should verify this default fits their payload profile.
MaxRequestBodySize int64 `mapstructure:"max_request_body_size"`
}With the default set in the factory: func createDefaultConfig() component.Config {
return &Config{
MaxRequestBodySize: 100 * 1024 * 1024, // 100 MB
}
}This keeps it simple — one field, one LimitReader, and a comment trail for whoever adds the next trigger type. |
|
@zmoog thank you for the review!
in this case we might end up processing a truncated payload without noticing, additionally there is another existing max request body: https://pkg.go.dev/go.opentelemetry.io/collector/config/confighttp#ServerConfig: I think it is better to avoid two competing limits, wdyt? |
Signed-off-by: Tetiana Kravchenko <tetiana.kravchenko@elastic.co>
|
@constanca-m @jmacd could you please have a look at this PR? |
constanca-m
left a comment
There was a problem hiding this comment.
Thanks! Looks very good. I am relying a bit on @zmoog review here, even though I have read the whole code now. Just two question, but LGTM
|
|
||
| // Logs defines configuration for log records received from Azure Functions. | ||
| Logs EncodingConfig `mapstructure:"logs"` | ||
| HTTP *confighttp.ServerConfig `mapstructure:"http_config"` |
There was a problem hiding this comment.
http_config name is a bit awkward. Usually it is just squashed or called http. Is there a reason why you changed it?
There was a problem hiding this comment.
For context, here's what the current config.yaml looks like:
azure_functions:
http_config:
endpoint: test:123
auth: azureauth
event_hub:
logs: # signal (logs, metrics, traces)
- name: logs # event hub name
encoding: azure_encoding
- name: raw_logs # event hub name
encoding: beats_encodingIn the above example, event_hub is the trigger type, then we have logs for the signal type, and the actual event hub name/encoding with the mapping to the encoding extension to use.
Since this is an Azure Functions receiver, in the future we'll possibly have more trigger types, like timer or http triggers. So, if we keep the trigger type at the root level we can't use http.
The alternative is to make the trigger explicit, like this:
azure_functions:
http:
endpoint: test:123
auth: azureauth
triggers:
event_hub:
logs: # signal (logs, metrics, traces)
- name: logs # event hub name
encoding: azure_encoding
- name: raw_logs # event hub name
encoding: beats_encoding@constanca-m, how does this look?
There was a problem hiding this comment.
Adding triggers feels a bit redundant now, but having all the trigger types in one bucket seems tidier. I’m going to make this change and revert to the idiomatic config option. Sorry for the back-and-forth, @tetianakravchenko!
| errs = append(errs, errors.New("at least one configured trigger with at least one binding is required")) | ||
| } | ||
|
|
||
| if cfg.EventHub != nil { |
There was a problem hiding this comment.
Is it possible for event hub to be nil? I thought it was mandatory for the receiver to work properly, but maybe I am wrong
There was a problem hiding this comment.
Since the event hub is the only trigger now, we can simply aassume it's required. WDYT?
There was a problem hiding this comment.
for now only event hub trigger is supported, but I tried to keep it generic - so instead of making event hub required, there is a check above: "at least one configured trigger with at least one binding is required"
LGTM. |
|
Hi @jmacd, all required codeowner approvals are in - is there anything else needed from my side before this is ready to merge? |
…ger types in one bucket Signed-off-by: Tetiana Kravchenko <tetiana.kravchenko@elastic.co>
Signed-off-by: Tetiana Kravchenko <tetiana.kravchenko@elastic.co>
Description
Follow up of #46584
This is the initial implementation of the new azure_functions receiver.
Focus of this PR:
Link to tracking issue
Fixes #43507
Testing
Corresponding Unit Test were added.
Documentation
Readme was adjusted to reflect PR changes