Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/azurefunctions-eventhub.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: new_component

# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog)
component: receiver/azure_functions

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Initial implementation of the Azure Functions receiver to ingest logs from Azure Functions triggered by Event Hub.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [43507]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
41 changes: 24 additions & 17 deletions receiver/azurefunctionsreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,16 @@

## Overview

The Azure Functions receiver is an OpenTelemetry Collector receiver that integrates with Azure Functions as a custom handler. It receives logs from Azure Event Hubs via the Azure Functions runtime and converts them to OpenTelemetry format for further processing and export.
The Azure Functions receiver is an OpenTelemetry Collector receiver that integrates with Azure Functions as a custom handler. It receives logs from Azure Event Hubs via the Azure Functions runtime and converts them to OpenTelemetry format for further processing and export. The receiver is structured by trigger type (e.g. Event Hub); each trigger can expose multiple log bindings with different encodings.

## How It Works

The receiver is designed to operate as part of an Azure Functions custom handler:

1. Azure Functions runtime consumes events from Azure Event Hubs (e.g. separate hubs for logs and metrics).
2. The runtime sends HTTP POST requests to the receiver's endpoint `/logs`.
2. The runtime sends HTTP POST requests to the receiver's endpoints (e.g. `/logs`, `/raw_logs`) according to the configured bindings.
3. The receiver decodes Azure Functions invoke requests containing Event Hub messages.
4. Messages are converted to OpenTelemetry format.
4. Messages are converted to OpenTelemetry format using the encoding extension configured per binding.
5. Data is forwarded to the configured pipeline consumers.

## Configuration
Expand All @@ -34,10 +34,12 @@ The following receiver configuration parameters are supported.
|------|------|-------------|
| `http` | confighttp.ServerConfig | **Required.** HTTP server settings (e.g. `endpoint: :9090`). Typically use `FUNCTIONS_CUSTOMHANDLER_PORT`. |
| `auth` | component.ID | Optional. Component ID of the extension that provides Azure authentication (e.g. token credential). |
| `logs.encoding` | component.ID | **Required.** Encoding extension ID for unmarshaling log records. |
| `include_invoke_metadata` | bool | Optional. When true, add Azure Functions invoke metadata to resource attributes. Default: false. |
| `triggers` | object | **Required.** Trigger configuration (e.g. Event Hub). At least one trigger with at least one log binding is required. |
| `triggers.event_hub` | object | Event Hub trigger configuration. Defines log bindings and metadata behavior. |
| `triggers.event_hub.logs` | list | List of log bindings. Each entry has `name` (binding name; maps to path `/<name>`) and `encoding` (component.ID). Binding names must be unique. |
| `triggers.event_hub.include_metadata` | bool | Optional. When true, add Azure Functions invoke metadata (e.g. Event Hub partition context) to resource attributes. Default: false. |

Required fields must be set for the receiver to start.
The `triggers` section and at least one trigger with at least one log binding are required for the receiver to register endpoints. Required fields must be set for the receiver to start.

### Example configuration

Expand All @@ -48,24 +50,29 @@ receivers:
http:
endpoint: :${env:FUNCTIONS_CUSTOMHANDLER_PORT:-9090}

# Logs configuration
logs:
# Encoding extension ID for log unmarshaling
# Must reference an encoding extension defined in the extensions section
encoding: azure_encoding
# Optional: Azure auth extension
auth: azureauth

# Include Azure Functions invoke metadata in resource attributes
# When enabled, adds partition context and system metadata
include_invoke_metadata: true
# Triggers: Event Hub with multiple log bindings, each with its own encoding
triggers:
event_hub:
logs:
- name: logs
encoding: azure_encoding
- name: raw_logs
encoding: azureresourcelogs_encoding
include_metadata: true

extensions:
azureauth:
# Azure auth extension configuration
azure_encoding:
# Encoding extension configuration
azureresourcelogs_encoding:
# Encoding extension configuration

service:
extensions: [azureauth, azure_encoding]
extensions: [azureauth, azure_encoding, azureresourcelogs_encoding]
pipelines:
logs:
receivers: [azure_functions]
Expand All @@ -74,11 +81,11 @@ service:

## Supported Signal Decoders

- **Logs** (Primary support) - Logs are decoded using an encoding extension (typically `azure_encoding`) that converts Azure Resource Logs format to OpenTelemetry logs.
- **Logs** (Primary support) Logs are decoded using an encoding extension per binding (e.g. `azure_encoding`) that converts the binding payload to OpenTelemetry logs.
- **Metrics** (Future consideration)

## Requirements

- Deployed as an Azure Functions custom handler.
- Azure Functions host configuration (`host.json`) with custom handler settings.
- Event Hub trigger bindings configured in `function.json`.
- Event Hub trigger bindings configured in `function.json`; binding names should match the `triggers.event_hub.logs[].name` values (e.g. `logs`, `raw_logs`).
60 changes: 49 additions & 11 deletions receiver/azurefunctionsreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package azurefunctionsreceiver // import "github.com/open-telemetry/opentelemetr

import (
"errors"
"fmt"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/confighttp"
Expand All @@ -14,21 +15,39 @@ type Config struct {
// HTTP defines the HTTP server settings for the Azure Functions invoke endpoints.
HTTP *confighttp.ServerConfig `mapstructure:"http"`

// Logs defines configuration for log records received from Azure Functions.
Logs EncodingConfig `mapstructure:"logs"`

// Auth is the component.ID of the extension that provides Azure authentication
Auth component.ID `mapstructure:"auth"`

// IncludeInvokeMetadata, when true, adds Azure Functions invoke metadata to resource attributes.
IncludeInvokeMetadata bool `mapstructure:"include_invoke_metadata"`
// Triggers holds configuration for Azure Functions triggers (e.g. Event Hub)
Triggers *TriggersConfig `mapstructure:"triggers"`
}

// TriggersConfig groups all supported trigger types for this receiver.
type TriggersConfig struct {
// EventHub configures the Event Hub trigger: log bindings and their encodings.
EventHub *EventHubTriggerConfig `mapstructure:"event_hub"`

_ struct{} // prevent unkeyed literal initialization
}

// EventHubTriggerConfig holds configuration for the Event Hub trigger.
type EventHubTriggerConfig struct {
// Logs is the list of log bindings (e.g. name "logs" maps to path /logs). Each binding has its own encoding.
Logs []LogsEncodingConfig `mapstructure:"logs"`
// IncludeMetadata, when true, adds Azure Functions invoke metadata to resource attributes.
IncludeMetadata bool `mapstructure:"include_metadata"`
}

// EncodingConfig holds the encoding extension configuration for a signal type.
type EncodingConfig struct {
// Encoding identifies the encoding of log records that triggered azure functions.
// LogsEncodingConfig holds the binding name and encoding for a signal (e.g. one log binding).
// Name is the Azure Functions binding name and typically corresponds to the request path (e.g. /logs, /raw_logs).
type LogsEncodingConfig struct {
Name string `mapstructure:"name"`
Encoding component.ID `mapstructure:"encoding"`
_ struct{} // Prevent unkeyed literal initialization
}

// hasAnyBinding reports whether at least one trigger has at least one binding.
func (t *TriggersConfig) hasAnyBinding() bool {
return t.EventHub != nil && len(t.EventHub.Logs) > 0
}

// Validate checks if the receiver configuration is valid.
Expand All @@ -38,8 +57,27 @@ func (cfg *Config) Validate() error {
errs = append(errs, errors.New("missing http server settings"))
}

if cfg.Logs.Encoding == (component.ID{}) {
errs = append(errs, errors.New("logs.encoding must be set"))
if cfg.Triggers == nil {
errs = append(errs, errors.New("missing triggers configuration"))
} else if !cfg.Triggers.hasAnyBinding() {
errs = append(errs, errors.New("at least one configured trigger with at least one binding is required"))
}

if cfg.Triggers != nil && cfg.Triggers.EventHub != nil {
eh := cfg.Triggers.EventHub
seen := make(map[string]struct{}, len(eh.Logs))
for i, log := range eh.Logs {
if log.Name == "" {
errs = append(errs, fmt.Errorf("triggers.event_hub.logs[%d].name must be set", i))
} else if _, ok := seen[log.Name]; ok {
errs = append(errs, fmt.Errorf("triggers.event_hub.logs: duplicate binding name %q", log.Name))
} else {
seen[log.Name] = struct{}{}
}
if log.Encoding.String() == "" {
errs = append(errs, fmt.Errorf("triggers.event_hub.logs[%d].encoding must be set", i))
}
}
}

return errors.Join(errs...)
Expand Down
37 changes: 28 additions & 9 deletions receiver/azurefunctionsreceiver/config.schema.yaml
Original file line number Diff line number Diff line change
@@ -1,12 +1,33 @@
$defs:
encoding_config:
description: EncodingConfig holds the encoding extension configuration for a signal type.
event_hub_trigger_config:
description: EventHubTriggerConfig holds configuration for the Event Hub trigger.
type: object
properties:
include_metadata:
description: IncludeMetadata, when true, adds Azure Functions invoke metadata to resource attributes.
type: boolean
logs:
description: Logs is the list of log bindings (e.g. name "logs" maps to path /logs). Each binding has its own encoding.
type: array
items:
$ref: logs_encoding_config
logs_encoding_config:
description: LogsEncodingConfig holds the binding name and encoding for a signal (e.g. one log binding). Name is the Azure Functions binding name and typically corresponds to the request path (e.g. /logs, /raw_logs).
type: object
properties:
encoding:
description: Encoding identifies the encoding of log records that triggered azure functions.
type: string
x-customType: go.opentelemetry.io/collector/component.ID
name:
type: string
triggers_config:
description: TriggersConfig groups all supported trigger types for this receiver.
type: object
properties:
event_hub:
description: 'EventHub configures the Event Hub trigger: log bindings and their encodings.'
x-pointer: true
$ref: event_hub_trigger_config
type: object
properties:
auth:
Expand All @@ -17,9 +38,7 @@ properties:
description: HTTP defines the HTTP server settings for the Azure Functions invoke endpoints.
x-pointer: true
$ref: go.opentelemetry.io/collector/config/confighttp.server_config
include_invoke_metadata:
description: IncludeInvokeMetadata, when true, adds Azure Functions invoke metadata to resource attributes.
type: boolean
logs:
description: Logs defines configuration for log records received from Azure Functions.
$ref: encoding_config
triggers:
description: Triggers holds configuration for Azure Functions triggers (e.g. Event Hub)
x-pointer: true
$ref: triggers_config
44 changes: 39 additions & 5 deletions receiver/azurefunctionsreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,23 +34,57 @@ func TestLoadConfig(t *testing.T) {
expected: &Config{
HTTP: &confighttp.ServerConfig{NetAddr: confignet.AddrConfig{Endpoint: "test:123", Transport: confignet.TransportTypeTCP}},
Auth: component.MustNewID("azureauth"),
Logs: EncodingConfig{Encoding: component.MustNewID("azure_encoding")},
Triggers: &TriggersConfig{
EventHub: &EventHubTriggerConfig{
Logs: []LogsEncodingConfig{
{Name: "logs", Encoding: component.MustNewID("azure_encoding")},
{Name: "raw_logs", Encoding: component.MustNewID("azureresourcelogs_encoding")},
},
IncludeMetadata: true,
},
},
},
},
{
id: component.NewIDWithName(metadata.Type, "no_auth"),
expected: &Config{
HTTP: &confighttp.ServerConfig{NetAddr: confignet.AddrConfig{Endpoint: "test:123", Transport: confignet.TransportTypeTCP}},
Logs: EncodingConfig{Encoding: component.MustNewID("azure_encoding")},
Triggers: &TriggersConfig{
EventHub: &EventHubTriggerConfig{
Logs: []LogsEncodingConfig{
{Name: "logs", Encoding: component.MustNewID("azure_encoding")},
},
},
},
},
},
{
id: component.NewIDWithName(metadata.Type, "no_http"),
id: component.NewIDWithName(metadata.Type, "no_http_config"),
expectedErrMessage: "missing http server settings",
},
{
id: component.NewIDWithName(metadata.Type, "missing_logs_encoding"),
expectedErrMessage: "logs.encoding must be set",
id: component.NewIDWithName(metadata.Type, "no_triggers"),
expectedErrMessage: "missing triggers configuration",
},
{
id: component.NewIDWithName(metadata.Type, "no_event_hub"),
expectedErrMessage: "at least one configured trigger with at least one binding is required",
},
{
id: component.NewIDWithName(metadata.Type, "empty_event_hub_logs"),
expectedErrMessage: "at least one configured trigger with at least one binding is required",
},
{
id: component.NewIDWithName(metadata.Type, "missing_binding_name"),
expectedErrMessage: "triggers.event_hub.logs[0].name must be set",
},
{
id: component.NewIDWithName(metadata.Type, "missing_binding_encoding"),
expectedErrMessage: "triggers.event_hub.logs[0].encoding must be set",
},
{
id: component.NewIDWithName(metadata.Type, "duplicate_binding_name"),
expectedErrMessage: `triggers.event_hub.logs: duplicate binding name "logs"`,
},
}
for _, tt := range tests {
Expand Down
40 changes: 40 additions & 0 deletions receiver/azurefunctionsreceiver/encoding.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package azurefunctionsreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/azurefunctionsreceiver"

import (
"fmt"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/plog"
)

// loadEncodingExtension loads an extension by ID from the host
// Returns an error if the extension is missing or does not implement the expected type.
func loadEncodingExtension[T any](host component.Host, id component.ID, signalType string) (T, error) {
var zero T
ext, ok := host.GetExtensions()[id]
if !ok {
return zero, fmt.Errorf("extension %q not found", id.String())
}
u, ok := ext.(T)
if !ok {
return zero, fmt.Errorf("extension %q is not a %s unmarshaler", id.String(), signalType)
}
return u, nil
}

// loadLogsUnmarshalers builds a map of binding name to plog.Unmarshaler by loading
// each encoding extension from the host
func loadLogsUnmarshalers(host component.Host, bindings []LogsEncodingConfig) (map[string]plog.Unmarshaler, error) {
out := make(map[string]plog.Unmarshaler, len(bindings))
for _, b := range bindings {
u, err := loadEncodingExtension[plog.Unmarshaler](host, b.Encoding, "logs")
if err != nil {
return nil, fmt.Errorf("binding %q: %w", b.Name, err)
}
out[b.Name] = u
}
return out, nil
}
Loading
Loading