Skip to content

Proposal for Custom Beholder Messages #1075

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 55 additions & 5 deletions pkg/custmsg/custom_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package custmsg

import (
"context"
"errors"
"fmt"
"maps"

Expand All @@ -13,7 +14,7 @@ import (

type MessageEmitter interface {
// Emit sends a message to the labeler's destination.
Emit(context.Context, string) error
Emit(context.Context, any) error

// WithMapLabels sets the labels for the message to be emitted. Labels are cumulative.
WithMapLabels(map[string]string) MessageEmitter
Expand All @@ -25,6 +26,18 @@ type MessageEmitter interface {
Labels() map[string]string
}

type ProtoDetail struct {
Schema string
Domain string
Entity string
}

// ProtoMessage is intended to be a pure function that provides a message and details required
// for publishing to beholder.
type ProtoMessage interface {
BeholderMessage() (proto.Message, ProtoDetail)
}

type Labeler struct {
labels map[string]string
}
Expand Down Expand Up @@ -69,8 +82,21 @@ func (l Labeler) With(keyValues ...string) MessageEmitter {
return newCustomMessageLabeler
}

func (l Labeler) Emit(ctx context.Context, msg string) error {
return sendLogAsCustomMessageW(ctx, msg, l.labels)
func (l Labeler) Emit(ctx context.Context, msg any) error {
switch typed := msg.(type) {
case string:
return sendLogAsStringMessageW(ctx, typed, l.labels)
default:
protoMsg, ok := msg.(ProtoMessage)
if !ok {
// TODO: can default to JSON encoding instead of returning an error
return errors.New("must be a proto message")
}

custMsg, desc := protoMsg.BeholderMessage()

return sendLogAsCustomMessageW(ctx, desc, custMsg, l.labels)
}
}

func (l Labeler) Labels() map[string]string {
Expand All @@ -84,10 +110,10 @@ func (l Labeler) Labels() map[string]string {
// SendLogAsCustomMessage emits a BaseMessage With msg and labels as data.
// any key in labels that is not part of orderedLabelKeys will not be transmitted
func (l Labeler) SendLogAsCustomMessage(ctx context.Context, msg string) error {
return sendLogAsCustomMessageW(ctx, msg, l.labels)
return sendLogAsStringMessageW(ctx, msg, l.labels)
}

func sendLogAsCustomMessageW(ctx context.Context, msg string, labels map[string]string) error {
func sendLogAsStringMessageW(ctx context.Context, msg string, labels map[string]string) error {
// TODO un-comment after INFOPLAT-1386
// cast to map[string]any
//newLabels := map[string]any{}
Expand Down Expand Up @@ -121,3 +147,27 @@ func sendLogAsCustomMessageW(ctx context.Context, msg string, labels map[string]

return nil
}

func sendLogAsCustomMessageW(ctx context.Context, desc ProtoDetail, msg proto.Message, labels map[string]string) error {
payloadBytes, err := proto.Marshal(msg)
if err != nil {
return fmt.Errorf("sending custom message failed to marshal protobuf: %w", err)
}

kvs := []any{
"beholder_data_schema", desc.Schema, // required
"beholder_domain", desc.Domain, // required
"beholder_entity", desc.Entity, // required
}

for key, value := range labels {
kvs = append(kvs, []any{key, value}...)
}

err = beholder.GetEmitter().Emit(ctx, payloadBytes, kvs...)
if err != nil {
return fmt.Errorf("sending custom message failed on emit: %w", err)
}

return nil
}
2 changes: 1 addition & 1 deletion pkg/workflows/wasm/host/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -665,7 +665,7 @@ func createLogFn(logger logger.Logger) func(caller *wasmtime.Caller, ptr int32,

type unimplementedMessageEmitter struct{}

func (u *unimplementedMessageEmitter) Emit(context.Context, string) error {
func (u *unimplementedMessageEmitter) Emit(context.Context, any) error {
return errors.New("unimplemented")
}

Expand Down
12 changes: 6 additions & 6 deletions pkg/workflows/wasm/host/module_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ import (
)

type mockMessageEmitter struct {
e func(context.Context, string, map[string]string) error
e func(context.Context, any, map[string]string) error
labels map[string]string
}

func (m *mockMessageEmitter) Emit(ctx context.Context, msg string) error {
func (m *mockMessageEmitter) Emit(ctx context.Context, msg any) error {
return m.e(ctx, msg, m.labels)
}

Expand All @@ -41,7 +41,7 @@ func (m *mockMessageEmitter) Labels() map[string]string {
return m.labels
}

func newMockMessageEmitter(e func(context.Context, string, map[string]string) error) custmsg.MessageEmitter {
func newMockMessageEmitter(e func(context.Context, any, map[string]string) error) custmsg.MessageEmitter {
return &mockMessageEmitter{e: e}
}

Expand All @@ -65,7 +65,7 @@ func Test_createEmitFn(t *testing.T) {
emitFn := createEmitFn(
logger.Test(t),
store,
newMockMessageEmitter(func(ctx context.Context, _ string, _ map[string]string) error {
newMockMessageEmitter(func(ctx context.Context, _ any, _ map[string]string) error {
v := ctx.Value(ctxKey)
assert.Equal(t, ctxValue, v)
return nil
Expand Down Expand Up @@ -106,7 +106,7 @@ func Test_createEmitFn(t *testing.T) {
emitFn := createEmitFn(
logger.Test(t),
store,
newMockMessageEmitter(func(_ context.Context, _ string, _ map[string]string) error {
newMockMessageEmitter(func(_ context.Context, _ any, _ map[string]string) error {
return nil
}),
unsafeReaderFunc(func(_ *wasmtime.Caller, _, _ int32) ([]byte, error) {
Expand Down Expand Up @@ -176,7 +176,7 @@ func Test_createEmitFn(t *testing.T) {
emitFn := createEmitFn(
logger.Test(t),
store,
newMockMessageEmitter(func(_ context.Context, _ string, _ map[string]string) error {
newMockMessageEmitter(func(_ context.Context, _ any, _ map[string]string) error {
return assert.AnError
}),
unsafeReaderFunc(func(_ *wasmtime.Caller, _, _ int32) ([]byte, error) {
Expand Down
10 changes: 5 additions & 5 deletions pkg/workflows/wasm/host/wasm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ func Test_Compute_Emit(t *testing.T) {
Logger: lggr,
Fetch: fetchFunc,
IsUncompressed: true,
Labeler: newMockMessageEmitter(func(gotCtx context.Context, msg string, kvs map[string]string) error {
Labeler: newMockMessageEmitter(func(gotCtx context.Context, msg any, kvs map[string]string) error {
t.Helper()

v := ctx.Value(ctxKey)
Expand Down Expand Up @@ -301,7 +301,7 @@ func Test_Compute_Emit(t *testing.T) {
Logger: lggr,
Fetch: fetchFunc,
IsUncompressed: true,
Labeler: newMockMessageEmitter(func(_ context.Context, msg string, kvs map[string]string) error {
Labeler: newMockMessageEmitter(func(_ context.Context, msg any, kvs map[string]string) error {
t.Helper()

assert.Equal(t, "testing emit", msg)
Expand Down Expand Up @@ -343,7 +343,7 @@ func Test_Compute_Emit(t *testing.T) {
Logger: lggr,
Fetch: fetchFunc,
IsUncompressed: true,
Labeler: newMockMessageEmitter(func(_ context.Context, msg string, labels map[string]string) error {
Labeler: newMockMessageEmitter(func(_ context.Context, msg any, labels map[string]string) error {
return nil
}), // never called
}, binary)
Expand Down Expand Up @@ -1265,7 +1265,7 @@ func TestModule_MaxResponseSizeBytesLimit(t *testing.T) {
ctx := tests.Context(t)
binary := createTestBinary(emitBinaryCmd, emitBinaryLocation, true, t)

emitter := newMockMessageEmitter(func(gotCtx context.Context, msg string, kvs map[string]string) error {
emitter := newMockMessageEmitter(func(gotCtx context.Context, msg any, kvs map[string]string) error {
return errors.New("some error")
})
// an emitter response with an error "some error" when marshaled is 14 bytes
Expand Down Expand Up @@ -1301,7 +1301,7 @@ func TestModule_MaxResponseSizeBytesLimit(t *testing.T) {
ctx := tests.Context(t)
binary := createTestBinary(emitBinaryCmd, emitBinaryLocation, true, t)

emitter := newMockMessageEmitter(func(gotCtx context.Context, msg string, kvs map[string]string) error {
emitter := newMockMessageEmitter(func(gotCtx context.Context, msg any, kvs map[string]string) error {
return errors.New("some error")
})

Expand Down
Loading