Skip to content

Commit 442d8b8

Browse files
authored
[receiver/libhoney] Response body sends proper partial errors for batches (#42272)
#### Description - Responds to libhoney clients with array of success and failures statuses for proper error handling - Sets both JSON and Webpack timestamps to avoid nil #### Testing Adds unit tests for timestamps and response body generation
1 parent 65b8858 commit 442d8b8

File tree

10 files changed

+416
-85
lines changed

10 files changed

+416
-85
lines changed
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
change_type: bug_fix
2+
component: libhoneyreceiver
3+
note: return full array of statuses per event
4+
issues: [42272]
5+
subtext:
6+
Libhoney has a per-event-within-each-batch response code array for each batch received.
7+
This has now been implemented for both initial parsing errors as well as downstream consumer errors.
8+
change_logs: [user]

receiver/libhoneyreceiver/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ The following settings are required:
2828

2929
- `http`
3030
- `endpoint` must set an endpoint. Defaults to `127.0.0.1:8080`
31-
- `compression_algorithms` (optional): List of supported compression algorithms. Defaults to `["", "gzip", "zstd", "zlib", "snappy", "deflate"]`. Set to `[]` to disable automatic decompression.
31+
- `compression_algorithms` (optional): List of supported compression algorithms. Defaults to `["", "gzip", "zstd", "zlib", "deflate"]`. Set to `[]` to disable automatic decompression.
3232
- `resources`: if the `service.name` field is different, map it here.
3333
- `scopes`: to get the `library.name` and `library.version` set in the scope section, set them here.
3434
- `attributes`: if the other trace-related data have different keys, map them here, defaults are otlp-like field names.

receiver/libhoneyreceiver/factory.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ func createDefaultConfig() component.Config {
4242
HTTP: configoptional.Default(HTTPConfig{
4343
ServerConfig: confighttp.ServerConfig{
4444
Endpoint: endpointStr,
45-
CompressionAlgorithms: []string{},
45+
CompressionAlgorithms: []string{"", "zstd", "gzip", "deflate"},
4646
},
4747
TracesURLPaths: defaultTracesURLPaths,
4848
}),

receiver/libhoneyreceiver/internal/libhoneyevent/libhoneyevent.go

Lines changed: 93 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
package libhoneyevent // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver/internal/libhoneyevent"
55

66
import (
7+
"bytes"
78
"crypto/rand"
89
"encoding/binary"
910
"encoding/hex"
@@ -15,6 +16,7 @@ import (
1516
"strings"
1617
"time"
1718

19+
"github.com/vmihailenco/msgpack/v5"
1820
"go.opentelemetry.io/collector/pdata/pcommon"
1921
"go.opentelemetry.io/collector/pdata/plog"
2022
"go.opentelemetry.io/collector/pdata/ptrace"
@@ -72,15 +74,64 @@ func (l *LibhoneyEvent) UnmarshalJSON(j []byte) error {
7274
if err != nil {
7375
return err
7476
}
75-
if tmp.MsgPackTimestamp.IsZero() && tmp.Time == "none" {
76-
// neither timestamp was set. give it right now.
77-
tmp.Time = tstr
78-
tnow := time.Now()
79-
tmp.MsgPackTimestamp = &tnow
77+
if tmp.MsgPackTimestamp == nil || tmp.MsgPackTimestamp.IsZero() {
78+
if tmp.Time == "none" {
79+
tmp.Time = tstr
80+
tnow := time.Now()
81+
tmp.MsgPackTimestamp = &tnow
82+
} else {
83+
propertime := eventtime.GetEventTime(tmp.Time)
84+
tmp.MsgPackTimestamp = &propertime
85+
}
86+
}
87+
88+
*l = LibhoneyEvent(tmp)
89+
return nil
90+
}
91+
92+
// UnmarshalMsgpack overrides the unmarshall to make sure the MsgPackTimestamp is set
93+
func (l *LibhoneyEvent) UnmarshalMsgpack(data []byte) error {
94+
type _libhoneyEvent LibhoneyEvent
95+
tstr := eventtime.GetEventTimeDefaultString()
96+
tzero := time.Time{}
97+
tmp := _libhoneyEvent{Time: "none", MsgPackTimestamp: &tzero, Samplerate: 1}
98+
99+
// Use a temporary struct to avoid recursion
100+
type tempEvent struct {
101+
Samplerate int `msgpack:"samplerate"`
102+
MsgPackTimestamp *time.Time `msgpack:"time"`
103+
Time string `msgpack:"-"` // Ignore during msgpack unmarshal
104+
Data map[string]any `msgpack:"data"`
80105
}
81-
if tmp.MsgPackTimestamp.IsZero() {
82-
propertime := eventtime.GetEventTime(tmp.Time)
83-
tmp.MsgPackTimestamp = &propertime
106+
107+
var tmpEvent tempEvent
108+
// First unmarshal into the temp struct
109+
decoder := msgpack.NewDecoder(bytes.NewReader(data))
110+
decoder.UseLooseInterfaceDecoding(true)
111+
err := decoder.Decode(&tmpEvent)
112+
if err != nil {
113+
return err
114+
}
115+
116+
// Copy fields to our tmp struct
117+
tmp.Samplerate = tmpEvent.Samplerate
118+
tmp.MsgPackTimestamp = tmpEvent.MsgPackTimestamp
119+
tmp.Data = tmpEvent.Data
120+
121+
// Check if Time field exists in Data and extract it
122+
if timeStr, ok := tmpEvent.Data["time"].(string); ok {
123+
tmp.Time = timeStr
124+
}
125+
126+
if tmp.MsgPackTimestamp == nil || tmp.MsgPackTimestamp.IsZero() {
127+
if tmp.Time == "none" {
128+
tmp.Time = tstr
129+
tnow := time.Now()
130+
tmp.MsgPackTimestamp = &tnow
131+
} else {
132+
propertime := eventtime.GetEventTime(tmp.Time)
133+
tmp.MsgPackTimestamp = &propertime
134+
}
84135
}
85136

86137
*l = LibhoneyEvent(tmp)
@@ -218,7 +269,23 @@ type ServiceHistory struct {
218269

219270
// ToPLogRecord converts a LibhoneyEvent to a Pdata LogRecord
220271
func (l *LibhoneyEvent) ToPLogRecord(newLog *plog.LogRecord, alreadyUsedFields *[]string, logger zap.Logger) error {
221-
timeNs := l.MsgPackTimestamp.UnixNano()
272+
// Handle cases where MsgPackTimestamp might be nil (e.g., JSON data from Refinery)
273+
var timeNs int64
274+
if l.MsgPackTimestamp != nil {
275+
timeNs = l.MsgPackTimestamp.UnixNano()
276+
} else {
277+
// Parse time from Time field or use current time
278+
if l.Time != "" {
279+
parsedTime, err := time.Parse(time.RFC3339, l.Time)
280+
if err == nil {
281+
timeNs = parsedTime.UnixNano()
282+
} else {
283+
timeNs = time.Now().UnixNano()
284+
}
285+
} else {
286+
timeNs = time.Now().UnixNano()
287+
}
288+
}
222289
logger.Debug("processing log with", zap.Int64("timestamp", timeNs))
223290
newLog.SetTimestamp(pcommon.Timestamp(timeNs))
224291

@@ -294,7 +361,23 @@ func (l *LibhoneyEvent) GetParentID(fieldName string) (trc.SpanID, error) {
294361

295362
// ToPTraceSpan converts a LibhoneyEvent to a Pdata Span
296363
func (l *LibhoneyEvent) ToPTraceSpan(newSpan *ptrace.Span, alreadyUsedFields *[]string, cfg FieldMapConfig, logger zap.Logger) error {
297-
timeNs := l.MsgPackTimestamp.UnixNano()
364+
// Handle cases where MsgPackTimestamp might be nil (e.g., JSON data from Refinery)
365+
var timeNs int64
366+
if l.MsgPackTimestamp != nil {
367+
timeNs = l.MsgPackTimestamp.UnixNano()
368+
} else {
369+
// Parse time from Time field or use current time
370+
if l.Time != "" {
371+
parsedTime, err := time.Parse(time.RFC3339, l.Time)
372+
if err == nil {
373+
timeNs = parsedTime.UnixNano()
374+
} else {
375+
timeNs = time.Now().UnixNano()
376+
}
377+
} else {
378+
timeNs = time.Now().UnixNano()
379+
}
380+
}
298381
logger.Debug("processing trace with", zap.Int64("timestamp", timeNs))
299382

300383
if pid, ok := l.Data[cfg.Attributes.ParentID]; ok {

receiver/libhoneyreceiver/internal/libhoneyevent/libhoneyevent_test.go

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010

1111
"github.com/stretchr/testify/assert"
1212
"github.com/stretchr/testify/require"
13+
"github.com/vmihailenco/msgpack/v5"
1314
"go.opentelemetry.io/collector/pdata/pcommon"
1415
"go.opentelemetry.io/collector/pdata/plog"
1516
"go.opentelemetry.io/collector/pdata/ptrace"
@@ -610,3 +611,99 @@ func TestGetParentID(t *testing.T) {
610611
})
611612
}
612613
}
614+
615+
func TestLibhoneyEvent_UnmarshalMsgpack(t *testing.T) {
616+
tests := []struct {
617+
name string
618+
msgpackData map[string]any
619+
expectNonNilTimestamp bool
620+
wantErr bool
621+
}{
622+
{
623+
name: "msgpack with nil timestamp",
624+
msgpackData: map[string]any{
625+
"data": map[string]any{
626+
"key": "value",
627+
},
628+
"samplerate": 1,
629+
// time field is not set (nil)
630+
},
631+
expectNonNilTimestamp: true,
632+
},
633+
{
634+
name: "msgpack with time string in data",
635+
msgpackData: map[string]any{
636+
"data": map[string]any{
637+
"key": "value",
638+
"time": "2024-01-01T00:00:00Z",
639+
},
640+
"samplerate": 2,
641+
},
642+
expectNonNilTimestamp: true,
643+
},
644+
{
645+
name: "msgpack with timestamp field",
646+
msgpackData: map[string]any{
647+
"time": time.Now(),
648+
"data": map[string]any{
649+
"key": "value",
650+
},
651+
"samplerate": 3,
652+
},
653+
expectNonNilTimestamp: true,
654+
},
655+
{
656+
name: "msgpack with zero timestamp",
657+
msgpackData: map[string]any{
658+
"time": time.Time{},
659+
"data": map[string]any{
660+
"key": "value",
661+
},
662+
"samplerate": 4,
663+
},
664+
expectNonNilTimestamp: true,
665+
},
666+
}
667+
668+
for _, tt := range tests {
669+
t.Run(tt.name, func(t *testing.T) {
670+
// Marshal the test data to msgpack
671+
msgpackBytes, err := msgpack.Marshal(tt.msgpackData)
672+
require.NoError(t, err)
673+
674+
// Unmarshal using our custom unmarshaller
675+
var event LibhoneyEvent
676+
err = event.UnmarshalMsgpack(msgpackBytes)
677+
678+
if tt.wantErr {
679+
assert.Error(t, err)
680+
return
681+
}
682+
683+
require.NoError(t, err)
684+
685+
// The key assertion: MsgPackTimestamp should never be nil after unmarshalling
686+
if tt.expectNonNilTimestamp {
687+
assert.NotNil(t, event.MsgPackTimestamp, "MsgPackTimestamp should never be nil after UnmarshalMsgpack")
688+
689+
// Additional checks
690+
if event.MsgPackTimestamp != nil && !event.MsgPackTimestamp.IsZero() {
691+
// If we have a valid timestamp, Time string should also be set
692+
assert.NotEmpty(t, event.Time, "Time string should be set when MsgPackTimestamp is valid")
693+
}
694+
}
695+
696+
// Check that data was preserved
697+
assert.Equal(t, tt.msgpackData["samplerate"], event.Samplerate)
698+
if data, ok := tt.msgpackData["data"].(map[string]any); ok {
699+
// Remove "time" from data if it was extracted
700+
delete(data, "time")
701+
if len(data) > 0 {
702+
for k, v := range data {
703+
assert.Equal(t, v, event.Data[k])
704+
}
705+
}
706+
}
707+
})
708+
}
709+
}

receiver/libhoneyreceiver/internal/parser/parser.go

Lines changed: 52 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package parser // import "github.com/open-telemetry/opentelemetry-collector-cont
66
import (
77
"encoding/hex"
88
"errors"
9+
"net/http"
910
"net/url"
1011
"slices"
1112
"time"
@@ -18,8 +19,15 @@ import (
1819
"go.uber.org/zap"
1920

2021
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver/internal/libhoneyevent"
22+
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver/internal/response"
2123
)
2224

25+
// IndexMapping tracks which original libhoney event indices became logs vs traces
26+
type IndexMapping struct {
27+
LogIndices []int // Original event indices that became logs
28+
TraceIndices []int // Original event indices that became traces/spans/span events
29+
}
30+
2331
// GetDatasetFromRequest extracts the dataset name from the request path
2432
func GetDatasetFromRequest(path string) (string, error) {
2533
if path == "" {
@@ -32,8 +40,8 @@ func GetDatasetFromRequest(path string) (string, error) {
3240
return dataset, nil
3341
}
3442

35-
// ToPdata converts a list of LibhoneyEvents to a Pdata Logs object
36-
func ToPdata(dataset string, lhes []libhoneyevent.LibhoneyEvent, cfg libhoneyevent.FieldMapConfig, logger zap.Logger) (plog.Logs, ptrace.Traces) {
43+
// ToPdata converts a list of LibhoneyEvents to a Pdata Logs object and tracks which original indices became what
44+
func ToPdata(dataset string, lhes []libhoneyevent.LibhoneyEvent, cfg libhoneyevent.FieldMapConfig, logger zap.Logger) (plog.Logs, ptrace.Traces, IndexMapping, []response.ResponseInBatch) {
3745
foundServices := libhoneyevent.ServiceHistory{}
3846
foundServices.NameCount = make(map[string]int)
3947
foundScopes := libhoneyevent.ScopeHistory{}
@@ -42,6 +50,15 @@ func ToPdata(dataset string, lhes []libhoneyevent.LibhoneyEvent, cfg libhoneyeve
4250
spanLinks := map[trc.SpanID][]libhoneyevent.LibhoneyEvent{}
4351
spanEvents := map[trc.SpanID][]libhoneyevent.LibhoneyEvent{}
4452

53+
// Initialize index mapping to track which original events become logs vs traces
54+
indexMapping := IndexMapping{
55+
LogIndices: make([]int, 0),
56+
TraceIndices: make([]int, 0),
57+
}
58+
59+
// Initialize parsing results to track success/failure per original event
60+
parsingResults := make([]response.ResponseInBatch, len(lhes))
61+
4562
foundScopes.Scope = make(map[string]libhoneyevent.SimpleScope) // a list of already seen scopes
4663

4764
alreadyUsedFields := []string{cfg.Resources.ServiceName, cfg.Scopes.LibraryName, cfg.Scopes.LibraryVersion}
@@ -51,7 +68,7 @@ func ToPdata(dataset string, lhes []libhoneyevent.LibhoneyEvent, cfg libhoneyeve
5168
cfg.Attributes.Error, cfg.Attributes.SpanKind,
5269
}
5370

54-
for _, lhe := range lhes {
71+
for i, lhe := range lhes {
5572
parentID, err := lhe.GetParentID(cfg.Attributes.ParentID)
5673
if err != nil {
5774
logger.Debug("parent id not found")
@@ -68,6 +85,16 @@ func ToPdata(dataset string, lhes []libhoneyevent.LibhoneyEvent, cfg libhoneyeve
6885
err := lhe.ToPTraceSpan(&newSpan, &alreadyUsedFields, cfg, logger)
6986
if err != nil {
7087
logger.Warn("span could not be converted from libhoney to ptrace", zap.String("span.object", lhe.DebugString()))
88+
parsingResults[i] = response.ResponseInBatch{
89+
Status: http.StatusBadRequest,
90+
ErrorStr: "span parsing failed: " + err.Error(),
91+
}
92+
} else {
93+
// Track successful span for consumer processing
94+
indexMapping.TraceIndices = append(indexMapping.TraceIndices, i)
95+
parsingResults[i] = response.ResponseInBatch{
96+
Status: http.StatusAccepted,
97+
}
7198
}
7299
case "log":
73100
logService, _ := lhe.GetService(cfg, &foundServices, dataset)
@@ -76,10 +103,24 @@ func ToPdata(dataset string, lhes []libhoneyevent.LibhoneyEvent, cfg libhoneyeve
76103
err := lhe.ToPLogRecord(&newLog, &alreadyUsedFields, logger)
77104
if err != nil {
78105
logger.Warn("log could not be converted from libhoney to plog", zap.String("span.object", lhe.DebugString()))
106+
parsingResults[i] = response.ResponseInBatch{
107+
Status: http.StatusBadRequest,
108+
ErrorStr: "log parsing failed: " + err.Error(),
109+
}
110+
} else {
111+
// Track successful log for consumer processing
112+
indexMapping.LogIndices = append(indexMapping.LogIndices, i)
113+
parsingResults[i] = response.ResponseInBatch{
114+
Status: http.StatusAccepted,
115+
}
79116
}
80117
case "span_event":
118+
// Span events are processed later, so we need index mapping for them
119+
indexMapping.TraceIndices = append(indexMapping.TraceIndices, i)
81120
spanEvents[parentID] = append(spanEvents[parentID], lhe)
82121
case "span_link":
122+
// Span links are processed later, so we need index mapping for them
123+
indexMapping.TraceIndices = append(indexMapping.TraceIndices, i)
83124
spanLinks[parentID] = append(spanLinks[parentID], lhe)
84125
}
85126
}
@@ -127,13 +168,19 @@ func ToPdata(dataset string, lhes []libhoneyevent.LibhoneyEvent, cfg libhoneyeve
127168
}
128169
}
129170

130-
return resultLogs, resultTraces
171+
return resultLogs, resultTraces, indexMapping, parsingResults
131172
}
132173

133174
func addSpanEventsToSpan(sp ptrace.Span, events []libhoneyevent.LibhoneyEvent, alreadyUsedFields []string, logger *zap.Logger) {
134175
for _, spe := range events {
135176
newEvent := sp.Events().AppendEmpty()
136-
newEvent.SetTimestamp(pcommon.Timestamp(spe.MsgPackTimestamp.UnixNano()))
177+
// Handle cases where MsgPackTimestamp might be nil (e.g., JSON data from Refinery)
178+
if spe.MsgPackTimestamp != nil {
179+
newEvent.SetTimestamp(pcommon.Timestamp(spe.MsgPackTimestamp.UnixNano()))
180+
} else {
181+
// Use current time if timestamp is not available
182+
newEvent.SetTimestamp(pcommon.Timestamp(time.Now().UnixNano()))
183+
}
137184
newEvent.SetName(spe.Data["name"].(string))
138185
for lkey, lval := range spe.Data {
139186
if slices.Contains(alreadyUsedFields, lkey) {

0 commit comments

Comments
 (0)