From 3e54ada556a73457adcb1a886f9cc1675d9060e8 Mon Sep 17 00:00:00 2001 From: Awbrey Hughlett Date: Fri, 21 Mar 2025 10:58:51 -0600 Subject: [PATCH] Proposal for Custom Beholder Messages The default custom message utility uses a `BaseMessage` proto with the message data expected to be a string and labels added to the proto message. Instead, a proto message itself can be sent where the message and schema details are packaged together by an interface. This promotes the design that any particular entity can be emitted given that it satisfies the interface. --- pkg/custmsg/custom_message.go | 60 +++++++++++++++++++++++--- pkg/workflows/wasm/host/module.go | 2 +- pkg/workflows/wasm/host/module_test.go | 12 +++--- pkg/workflows/wasm/host/wasm_test.go | 10 ++--- 4 files changed, 67 insertions(+), 17 deletions(-) diff --git a/pkg/custmsg/custom_message.go b/pkg/custmsg/custom_message.go index 49ed8c459..57b3fdbf8 100644 --- a/pkg/custmsg/custom_message.go +++ b/pkg/custmsg/custom_message.go @@ -2,6 +2,7 @@ package custmsg import ( "context" + "errors" "fmt" "maps" @@ -13,7 +14,7 @@ import ( type MessageEmitter interface { // Emit sends a message to the labeler's destination. - Emit(context.Context, string) error + Emit(context.Context, any) error // WithMapLabels sets the labels for the message to be emitted. Labels are cumulative. WithMapLabels(map[string]string) MessageEmitter @@ -25,6 +26,18 @@ type MessageEmitter interface { Labels() map[string]string } +type ProtoDetail struct { + Schema string + Domain string + Entity string +} + +// ProtoMessage is intended to be a pure function that provides a message and details required +// for publishing to beholder. +type ProtoMessage interface { + BeholderMessage() (proto.Message, ProtoDetail) +} + type Labeler struct { labels map[string]string } @@ -69,8 +82,21 @@ func (l Labeler) With(keyValues ...string) MessageEmitter { return newCustomMessageLabeler } -func (l Labeler) Emit(ctx context.Context, msg string) error { - return sendLogAsCustomMessageW(ctx, msg, l.labels) +func (l Labeler) Emit(ctx context.Context, msg any) error { + switch typed := msg.(type) { + case string: + return sendLogAsStringMessageW(ctx, typed, l.labels) + default: + protoMsg, ok := msg.(ProtoMessage) + if !ok { + // TODO: can default to JSON encoding instead of returning an error + return errors.New("must be a proto message") + } + + custMsg, desc := protoMsg.BeholderMessage() + + return sendLogAsCustomMessageW(ctx, desc, custMsg, l.labels) + } } func (l Labeler) Labels() map[string]string { @@ -84,10 +110,10 @@ func (l Labeler) Labels() map[string]string { // SendLogAsCustomMessage emits a BaseMessage With msg and labels as data. // any key in labels that is not part of orderedLabelKeys will not be transmitted func (l Labeler) SendLogAsCustomMessage(ctx context.Context, msg string) error { - return sendLogAsCustomMessageW(ctx, msg, l.labels) + return sendLogAsStringMessageW(ctx, msg, l.labels) } -func sendLogAsCustomMessageW(ctx context.Context, msg string, labels map[string]string) error { +func sendLogAsStringMessageW(ctx context.Context, msg string, labels map[string]string) error { // TODO un-comment after INFOPLAT-1386 // cast to map[string]any //newLabels := map[string]any{} @@ -121,3 +147,27 @@ func sendLogAsCustomMessageW(ctx context.Context, msg string, labels map[string] return nil } + +func sendLogAsCustomMessageW(ctx context.Context, desc ProtoDetail, msg proto.Message, labels map[string]string) error { + payloadBytes, err := proto.Marshal(msg) + if err != nil { + return fmt.Errorf("sending custom message failed to marshal protobuf: %w", err) + } + + kvs := []any{ + "beholder_data_schema", desc.Schema, // required + "beholder_domain", desc.Domain, // required + "beholder_entity", desc.Entity, // required + } + + for key, value := range labels { + kvs = append(kvs, []any{key, value}...) + } + + err = beholder.GetEmitter().Emit(ctx, payloadBytes, kvs...) + if err != nil { + return fmt.Errorf("sending custom message failed on emit: %w", err) + } + + return nil +} diff --git a/pkg/workflows/wasm/host/module.go b/pkg/workflows/wasm/host/module.go index 2f31c8d46..f04e86955 100644 --- a/pkg/workflows/wasm/host/module.go +++ b/pkg/workflows/wasm/host/module.go @@ -665,7 +665,7 @@ func createLogFn(logger logger.Logger) func(caller *wasmtime.Caller, ptr int32, type unimplementedMessageEmitter struct{} -func (u *unimplementedMessageEmitter) Emit(context.Context, string) error { +func (u *unimplementedMessageEmitter) Emit(context.Context, any) error { return errors.New("unimplemented") } diff --git a/pkg/workflows/wasm/host/module_test.go b/pkg/workflows/wasm/host/module_test.go index 1112197c9..552bf5f94 100644 --- a/pkg/workflows/wasm/host/module_test.go +++ b/pkg/workflows/wasm/host/module_test.go @@ -19,11 +19,11 @@ import ( ) type mockMessageEmitter struct { - e func(context.Context, string, map[string]string) error + e func(context.Context, any, map[string]string) error labels map[string]string } -func (m *mockMessageEmitter) Emit(ctx context.Context, msg string) error { +func (m *mockMessageEmitter) Emit(ctx context.Context, msg any) error { return m.e(ctx, msg, m.labels) } @@ -41,7 +41,7 @@ func (m *mockMessageEmitter) Labels() map[string]string { return m.labels } -func newMockMessageEmitter(e func(context.Context, string, map[string]string) error) custmsg.MessageEmitter { +func newMockMessageEmitter(e func(context.Context, any, map[string]string) error) custmsg.MessageEmitter { return &mockMessageEmitter{e: e} } @@ -65,7 +65,7 @@ func Test_createEmitFn(t *testing.T) { emitFn := createEmitFn( logger.Test(t), store, - newMockMessageEmitter(func(ctx context.Context, _ string, _ map[string]string) error { + newMockMessageEmitter(func(ctx context.Context, _ any, _ map[string]string) error { v := ctx.Value(ctxKey) assert.Equal(t, ctxValue, v) return nil @@ -106,7 +106,7 @@ func Test_createEmitFn(t *testing.T) { emitFn := createEmitFn( logger.Test(t), store, - newMockMessageEmitter(func(_ context.Context, _ string, _ map[string]string) error { + newMockMessageEmitter(func(_ context.Context, _ any, _ map[string]string) error { return nil }), unsafeReaderFunc(func(_ *wasmtime.Caller, _, _ int32) ([]byte, error) { @@ -176,7 +176,7 @@ func Test_createEmitFn(t *testing.T) { emitFn := createEmitFn( logger.Test(t), store, - newMockMessageEmitter(func(_ context.Context, _ string, _ map[string]string) error { + newMockMessageEmitter(func(_ context.Context, _ any, _ map[string]string) error { return assert.AnError }), unsafeReaderFunc(func(_ *wasmtime.Caller, _, _ int32) ([]byte, error) { diff --git a/pkg/workflows/wasm/host/wasm_test.go b/pkg/workflows/wasm/host/wasm_test.go index a83a80786..45c99a671 100644 --- a/pkg/workflows/wasm/host/wasm_test.go +++ b/pkg/workflows/wasm/host/wasm_test.go @@ -271,7 +271,7 @@ func Test_Compute_Emit(t *testing.T) { Logger: lggr, Fetch: fetchFunc, IsUncompressed: true, - Labeler: newMockMessageEmitter(func(gotCtx context.Context, msg string, kvs map[string]string) error { + Labeler: newMockMessageEmitter(func(gotCtx context.Context, msg any, kvs map[string]string) error { t.Helper() v := ctx.Value(ctxKey) @@ -301,7 +301,7 @@ func Test_Compute_Emit(t *testing.T) { Logger: lggr, Fetch: fetchFunc, IsUncompressed: true, - Labeler: newMockMessageEmitter(func(_ context.Context, msg string, kvs map[string]string) error { + Labeler: newMockMessageEmitter(func(_ context.Context, msg any, kvs map[string]string) error { t.Helper() assert.Equal(t, "testing emit", msg) @@ -343,7 +343,7 @@ func Test_Compute_Emit(t *testing.T) { Logger: lggr, Fetch: fetchFunc, IsUncompressed: true, - Labeler: newMockMessageEmitter(func(_ context.Context, msg string, labels map[string]string) error { + Labeler: newMockMessageEmitter(func(_ context.Context, msg any, labels map[string]string) error { return nil }), // never called }, binary) @@ -1265,7 +1265,7 @@ func TestModule_MaxResponseSizeBytesLimit(t *testing.T) { ctx := tests.Context(t) binary := createTestBinary(emitBinaryCmd, emitBinaryLocation, true, t) - emitter := newMockMessageEmitter(func(gotCtx context.Context, msg string, kvs map[string]string) error { + emitter := newMockMessageEmitter(func(gotCtx context.Context, msg any, kvs map[string]string) error { return errors.New("some error") }) // an emitter response with an error "some error" when marshaled is 14 bytes @@ -1301,7 +1301,7 @@ func TestModule_MaxResponseSizeBytesLimit(t *testing.T) { ctx := tests.Context(t) binary := createTestBinary(emitBinaryCmd, emitBinaryLocation, true, t) - emitter := newMockMessageEmitter(func(gotCtx context.Context, msg string, kvs map[string]string) error { + emitter := newMockMessageEmitter(func(gotCtx context.Context, msg any, kvs map[string]string) error { return errors.New("some error") })