Skip to content
Merged
8 changes: 8 additions & 0 deletions .chloggen/libhoney-full-response.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
change_type: bug_fix
component: libhoneyreceiver
note: return full array of statuses per event
issues: [42272]
subtext:
Libhoney has a per-event-within-each-batch response code array for each batch received.
This has now been implemented for both initial parsing errors as well as downstream consumer errors.
change_logs: [user]
2 changes: 1 addition & 1 deletion receiver/libhoneyreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ The following settings are required:

- `http`
- `endpoint` must set an endpoint. Defaults to `127.0.0.1:8080`
- `compression_algorithms` (optional): List of supported compression algorithms. Defaults to `["", "gzip", "zstd", "zlib", "snappy", "deflate"]`. Set to `[]` to disable automatic decompression.
- `compression_algorithms` (optional): List of supported compression algorithms. Defaults to `["", "gzip", "zstd", "zlib", "deflate"]`. Set to `[]` to disable automatic decompression.
- `resources`: if the `service.name` field is different, map it here.
- `scopes`: to get the `library.name` and `library.version` set in the scope section, set them here.
- `attributes`: if the other trace-related data have different keys, map them here, defaults are otlp-like field names.
Expand Down
2 changes: 1 addition & 1 deletion receiver/libhoneyreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func createDefaultConfig() component.Config {
HTTP: configoptional.Default(HTTPConfig{
ServerConfig: confighttp.ServerConfig{
Endpoint: endpointStr,
CompressionAlgorithms: []string{},
CompressionAlgorithms: []string{"", "zstd", "gzip", "deflate"},
},
TracesURLPaths: defaultTracesURLPaths,
}),
Expand Down
103 changes: 93 additions & 10 deletions receiver/libhoneyreceiver/internal/libhoneyevent/libhoneyevent.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package libhoneyevent // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver/internal/libhoneyevent"

import (
"bytes"
"crypto/rand"
"encoding/binary"
"encoding/hex"
Expand All @@ -15,6 +16,7 @@ import (
"strings"
"time"

"github.com/vmihailenco/msgpack/v5"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/ptrace"
Expand Down Expand Up @@ -72,15 +74,64 @@ func (l *LibhoneyEvent) UnmarshalJSON(j []byte) error {
if err != nil {
return err
}
if tmp.MsgPackTimestamp.IsZero() && tmp.Time == "none" {
// neither timestamp was set. give it right now.
tmp.Time = tstr
tnow := time.Now()
tmp.MsgPackTimestamp = &tnow
if tmp.MsgPackTimestamp == nil || tmp.MsgPackTimestamp.IsZero() {
if tmp.Time == "none" {
tmp.Time = tstr
tnow := time.Now()
tmp.MsgPackTimestamp = &tnow
} else {
propertime := eventtime.GetEventTime(tmp.Time)
tmp.MsgPackTimestamp = &propertime
}
}

*l = LibhoneyEvent(tmp)
return nil
}

// UnmarshalMsgpack overrides the unmarshall to make sure the MsgPackTimestamp is set
func (l *LibhoneyEvent) UnmarshalMsgpack(data []byte) error {
type _libhoneyEvent LibhoneyEvent
tstr := eventtime.GetEventTimeDefaultString()
tzero := time.Time{}
tmp := _libhoneyEvent{Time: "none", MsgPackTimestamp: &tzero, Samplerate: 1}

// Use a temporary struct to avoid recursion
type tempEvent struct {
Samplerate int `msgpack:"samplerate"`
MsgPackTimestamp *time.Time `msgpack:"time"`
Time string `msgpack:"-"` // Ignore during msgpack unmarshal
Data map[string]any `msgpack:"data"`
}
if tmp.MsgPackTimestamp.IsZero() {
propertime := eventtime.GetEventTime(tmp.Time)
tmp.MsgPackTimestamp = &propertime

var tmpEvent tempEvent
// First unmarshal into the temp struct
decoder := msgpack.NewDecoder(bytes.NewReader(data))
decoder.UseLooseInterfaceDecoding(true)
err := decoder.Decode(&tmpEvent)
if err != nil {
return err
}

// Copy fields to our tmp struct
tmp.Samplerate = tmpEvent.Samplerate
tmp.MsgPackTimestamp = tmpEvent.MsgPackTimestamp
tmp.Data = tmpEvent.Data

// Check if Time field exists in Data and extract it
if timeStr, ok := tmpEvent.Data["time"].(string); ok {
tmp.Time = timeStr
}

if tmp.MsgPackTimestamp == nil || tmp.MsgPackTimestamp.IsZero() {
if tmp.Time == "none" {
tmp.Time = tstr
tnow := time.Now()
tmp.MsgPackTimestamp = &tnow
} else {
propertime := eventtime.GetEventTime(tmp.Time)
tmp.MsgPackTimestamp = &propertime
}
}

*l = LibhoneyEvent(tmp)
Expand Down Expand Up @@ -218,7 +269,23 @@ type ServiceHistory struct {

// ToPLogRecord converts a LibhoneyEvent to a Pdata LogRecord
func (l *LibhoneyEvent) ToPLogRecord(newLog *plog.LogRecord, alreadyUsedFields *[]string, logger zap.Logger) error {
timeNs := l.MsgPackTimestamp.UnixNano()
// Handle cases where MsgPackTimestamp might be nil (e.g., JSON data from Refinery)
var timeNs int64
if l.MsgPackTimestamp != nil {
timeNs = l.MsgPackTimestamp.UnixNano()
} else {
// Parse time from Time field or use current time
if l.Time != "" {
parsedTime, err := time.Parse(time.RFC3339, l.Time)
if err == nil {
timeNs = parsedTime.UnixNano()
} else {
timeNs = time.Now().UnixNano()
}
} else {
timeNs = time.Now().UnixNano()
}
}
logger.Debug("processing log with", zap.Int64("timestamp", timeNs))
newLog.SetTimestamp(pcommon.Timestamp(timeNs))

Expand Down Expand Up @@ -294,7 +361,23 @@ func (l *LibhoneyEvent) GetParentID(fieldName string) (trc.SpanID, error) {

// ToPTraceSpan converts a LibhoneyEvent to a Pdata Span
func (l *LibhoneyEvent) ToPTraceSpan(newSpan *ptrace.Span, alreadyUsedFields *[]string, cfg FieldMapConfig, logger zap.Logger) error {
timeNs := l.MsgPackTimestamp.UnixNano()
// Handle cases where MsgPackTimestamp might be nil (e.g., JSON data from Refinery)
var timeNs int64
if l.MsgPackTimestamp != nil {
timeNs = l.MsgPackTimestamp.UnixNano()
} else {
// Parse time from Time field or use current time
if l.Time != "" {
parsedTime, err := time.Parse(time.RFC3339, l.Time)
if err == nil {
timeNs = parsedTime.UnixNano()
} else {
timeNs = time.Now().UnixNano()
}
} else {
timeNs = time.Now().UnixNano()
}
}
logger.Debug("processing trace with", zap.Int64("timestamp", timeNs))

if pid, ok := l.Data[cfg.Attributes.ParentID]; ok {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/vmihailenco/msgpack/v5"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/ptrace"
Expand Down Expand Up @@ -610,3 +611,99 @@ func TestGetParentID(t *testing.T) {
})
}
}

func TestLibhoneyEvent_UnmarshalMsgpack(t *testing.T) {
tests := []struct {
name string
msgpackData map[string]any
expectNonNilTimestamp bool
wantErr bool
}{
{
name: "msgpack with nil timestamp",
msgpackData: map[string]any{
"data": map[string]any{
"key": "value",
},
"samplerate": 1,
// time field is not set (nil)
},
expectNonNilTimestamp: true,
},
{
name: "msgpack with time string in data",
msgpackData: map[string]any{
"data": map[string]any{
"key": "value",
"time": "2024-01-01T00:00:00Z",
},
"samplerate": 2,
},
expectNonNilTimestamp: true,
},
{
name: "msgpack with timestamp field",
msgpackData: map[string]any{
"time": time.Now(),
"data": map[string]any{
"key": "value",
},
"samplerate": 3,
},
expectNonNilTimestamp: true,
},
{
name: "msgpack with zero timestamp",
msgpackData: map[string]any{
"time": time.Time{},
"data": map[string]any{
"key": "value",
},
"samplerate": 4,
},
expectNonNilTimestamp: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Marshal the test data to msgpack
msgpackBytes, err := msgpack.Marshal(tt.msgpackData)
require.NoError(t, err)

// Unmarshal using our custom unmarshaller
var event LibhoneyEvent
err = event.UnmarshalMsgpack(msgpackBytes)

if tt.wantErr {
assert.Error(t, err)
return
}

require.NoError(t, err)

// The key assertion: MsgPackTimestamp should never be nil after unmarshalling
if tt.expectNonNilTimestamp {
assert.NotNil(t, event.MsgPackTimestamp, "MsgPackTimestamp should never be nil after UnmarshalMsgpack")

// Additional checks
if event.MsgPackTimestamp != nil && !event.MsgPackTimestamp.IsZero() {
// If we have a valid timestamp, Time string should also be set
assert.NotEmpty(t, event.Time, "Time string should be set when MsgPackTimestamp is valid")
}
}

// Check that data was preserved
assert.Equal(t, tt.msgpackData["samplerate"], event.Samplerate)
if data, ok := tt.msgpackData["data"].(map[string]any); ok {
// Remove "time" from data if it was extracted
delete(data, "time")
if len(data) > 0 {
for k, v := range data {
assert.Equal(t, v, event.Data[k])
}
}
}
})
}
}
57 changes: 52 additions & 5 deletions receiver/libhoneyreceiver/internal/parser/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package parser // import "github.com/open-telemetry/opentelemetry-collector-cont
import (
"encoding/hex"
"errors"
"net/http"
"net/url"
"slices"
"time"
Expand All @@ -18,8 +19,15 @@ import (
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver/internal/libhoneyevent"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver/internal/response"
)

// IndexMapping tracks which original libhoney event indices became logs vs traces
type IndexMapping struct {
LogIndices []int // Original event indices that became logs
TraceIndices []int // Original event indices that became traces/spans/span events
}

// GetDatasetFromRequest extracts the dataset name from the request path
func GetDatasetFromRequest(path string) (string, error) {
if path == "" {
Expand All @@ -32,8 +40,8 @@ func GetDatasetFromRequest(path string) (string, error) {
return dataset, nil
}

// ToPdata converts a list of LibhoneyEvents to a Pdata Logs object
func ToPdata(dataset string, lhes []libhoneyevent.LibhoneyEvent, cfg libhoneyevent.FieldMapConfig, logger zap.Logger) (plog.Logs, ptrace.Traces) {
// ToPdata converts a list of LibhoneyEvents to a Pdata Logs object and tracks which original indices became what
func ToPdata(dataset string, lhes []libhoneyevent.LibhoneyEvent, cfg libhoneyevent.FieldMapConfig, logger zap.Logger) (plog.Logs, ptrace.Traces, IndexMapping, []response.ResponseInBatch) {
foundServices := libhoneyevent.ServiceHistory{}
foundServices.NameCount = make(map[string]int)
foundScopes := libhoneyevent.ScopeHistory{}
Expand All @@ -42,6 +50,15 @@ func ToPdata(dataset string, lhes []libhoneyevent.LibhoneyEvent, cfg libhoneyeve
spanLinks := map[trc.SpanID][]libhoneyevent.LibhoneyEvent{}
spanEvents := map[trc.SpanID][]libhoneyevent.LibhoneyEvent{}

// Initialize index mapping to track which original events become logs vs traces
indexMapping := IndexMapping{
LogIndices: make([]int, 0),
TraceIndices: make([]int, 0),
}

// Initialize parsing results to track success/failure per original event
parsingResults := make([]response.ResponseInBatch, len(lhes))

foundScopes.Scope = make(map[string]libhoneyevent.SimpleScope) // a list of already seen scopes

alreadyUsedFields := []string{cfg.Resources.ServiceName, cfg.Scopes.LibraryName, cfg.Scopes.LibraryVersion}
Expand All @@ -51,7 +68,7 @@ func ToPdata(dataset string, lhes []libhoneyevent.LibhoneyEvent, cfg libhoneyeve
cfg.Attributes.Error, cfg.Attributes.SpanKind,
}

for _, lhe := range lhes {
for i, lhe := range lhes {
parentID, err := lhe.GetParentID(cfg.Attributes.ParentID)
if err != nil {
logger.Debug("parent id not found")
Expand All @@ -68,6 +85,16 @@ func ToPdata(dataset string, lhes []libhoneyevent.LibhoneyEvent, cfg libhoneyeve
err := lhe.ToPTraceSpan(&newSpan, &alreadyUsedFields, cfg, logger)
if err != nil {
logger.Warn("span could not be converted from libhoney to ptrace", zap.String("span.object", lhe.DebugString()))
parsingResults[i] = response.ResponseInBatch{
Status: http.StatusBadRequest,
ErrorStr: "span parsing failed: " + err.Error(),
}
} else {
// Track successful span for consumer processing
indexMapping.TraceIndices = append(indexMapping.TraceIndices, i)
parsingResults[i] = response.ResponseInBatch{
Status: http.StatusAccepted,
}
}
case "log":
logService, _ := lhe.GetService(cfg, &foundServices, dataset)
Expand All @@ -76,10 +103,24 @@ func ToPdata(dataset string, lhes []libhoneyevent.LibhoneyEvent, cfg libhoneyeve
err := lhe.ToPLogRecord(&newLog, &alreadyUsedFields, logger)
if err != nil {
logger.Warn("log could not be converted from libhoney to plog", zap.String("span.object", lhe.DebugString()))
parsingResults[i] = response.ResponseInBatch{
Status: http.StatusBadRequest,
ErrorStr: "log parsing failed: " + err.Error(),
}
} else {
// Track successful log for consumer processing
indexMapping.LogIndices = append(indexMapping.LogIndices, i)
parsingResults[i] = response.ResponseInBatch{
Status: http.StatusAccepted,
}
}
case "span_event":
// Span events are processed later, so we need index mapping for them
indexMapping.TraceIndices = append(indexMapping.TraceIndices, i)
spanEvents[parentID] = append(spanEvents[parentID], lhe)
case "span_link":
// Span links are processed later, so we need index mapping for them
indexMapping.TraceIndices = append(indexMapping.TraceIndices, i)
spanLinks[parentID] = append(spanLinks[parentID], lhe)
}
}
Expand Down Expand Up @@ -127,13 +168,19 @@ func ToPdata(dataset string, lhes []libhoneyevent.LibhoneyEvent, cfg libhoneyeve
}
}

return resultLogs, resultTraces
return resultLogs, resultTraces, indexMapping, parsingResults
}

func addSpanEventsToSpan(sp ptrace.Span, events []libhoneyevent.LibhoneyEvent, alreadyUsedFields []string, logger *zap.Logger) {
for _, spe := range events {
newEvent := sp.Events().AppendEmpty()
newEvent.SetTimestamp(pcommon.Timestamp(spe.MsgPackTimestamp.UnixNano()))
// Handle cases where MsgPackTimestamp might be nil (e.g., JSON data from Refinery)
if spe.MsgPackTimestamp != nil {
newEvent.SetTimestamp(pcommon.Timestamp(spe.MsgPackTimestamp.UnixNano()))
} else {
// Use current time if timestamp is not available
newEvent.SetTimestamp(pcommon.Timestamp(time.Now().UnixNano()))
}
newEvent.SetName(spe.Data["name"].(string))
for lkey, lval := range spe.Data {
if slices.Contains(alreadyUsedFields, lkey) {
Expand Down
Loading
Loading