From f47f02a765a23cdbbaa60b3ef2545406939c1a7f Mon Sep 17 00:00:00 2001 From: Rafal Strzalinski Date: Thu, 26 Jun 2025 12:00:19 +0200 Subject: [PATCH 1/3] WIP --- platform/logger/log_sender.go | 56 +++++++++++++++++++++++++++++++---- 1 file changed, 50 insertions(+), 6 deletions(-) diff --git a/platform/logger/log_sender.go b/platform/logger/log_sender.go index 615527cd9..48478b799 100644 --- a/platform/logger/log_sender.go +++ b/platform/logger/log_sender.go @@ -7,6 +7,8 @@ import ( "errors" "fmt" "github.com/QuesmaOrg/quesma/platform/telemetry/headers" + "github.com/goccy/go-json" + "log" "net/http" "net/url" "strconv" @@ -22,6 +24,45 @@ type LogSender struct { httpClient *http.Client } +func (logSender *LogSender) cutMessage(msg []byte) ([]byte, bool) { + + var msgMap map[string]any + if err := json.Unmarshal(msg, &msgMap); err != nil { + + return nil, false // if not a valid JSON, return as is + } + + message, ok := msgMap["message"].(string) + + if !ok { + return nil, false // if "message" key is not present or not a string, return as is + } + + cutMark := "..." + newLine := "\n" + charToCut := len(message) + len(cutMark) + len(newLine) - cap(logSender.LogBuffer) + + log.Println("XXXX LogSender: cutting message to fit buffer, charToCut:", charToCut, "message length:", len(message), "buffer capacity:", cap(logSender.LogBuffer)) + + // cutting the message will not help, let's drop it + if charToCut < 0 { + return nil, false // if buffer has enough space, return original message + } + + if charToCut < len(message) { + msgCut := message[:len(message)-charToCut] + msgMap["message"] = msgCut + cutMark + } else { + msgMap["message"] = message + } + + trimmedMsg, err := json.Marshal(msgMap) + if err != nil { + return nil, false // if marshalling fails, return as is + } + return append(trimmedMsg, newLine...), ok // append newline to maintain log format +} + func (logSender *LogSender) EatLogMessage(msg []byte) struct { bufferLengthCondition bool timeCondition bool @@ -38,14 +79,17 @@ func (logSender *LogSender) EatLogMessage(msg []byte) struct { } else { addedBefore := false if !bufferLengthCondition && len(logSender.LogBuffer) == 0 { // msg longer than buffer, let's cut it - cutMark := []byte("...\n") - charToCut := len(msg) + len(cutMark) - cap(logSender.LogBuffer) - if charToCut < len(msg) { - msgCut := msg[:len(msg)-charToCut] - logSender.LogBuffer = append(logSender.LogBuffer, msgCut...) - logSender.LogBuffer = append(logSender.LogBuffer, cutMark...) + + // we can the message part + trimmedMsg, ok := logSender.cutMessage(msg) + if ok { + log.Println("XXXX LogSender: message was too long, cutting it to fit buffer") + logSender.LogBuffer = append(logSender.LogBuffer, trimmedMsg...) addedBefore = true + } else { + log.Println("XXXX LogSender: message was too long, but could not cut it, dropping it") } + } else if len(logSender.LogBuffer)+len(msg) <= cap(logSender.LogBuffer) { // still fits in buffer logSender.LogBuffer = append(logSender.LogBuffer, msg...) addedBefore = true From c48b8431dd30f67a6c35ea1361c0e68261761ce9 Mon Sep 17 00:00:00 2001 From: Rafal Strzalinski Date: Thu, 26 Jun 2025 12:04:02 +0200 Subject: [PATCH 2/3] Keep it simple --- platform/logger/log_sender.go | 56 ++++------------------------------- 1 file changed, 6 insertions(+), 50 deletions(-) diff --git a/platform/logger/log_sender.go b/platform/logger/log_sender.go index 48478b799..46d48bb61 100644 --- a/platform/logger/log_sender.go +++ b/platform/logger/log_sender.go @@ -7,8 +7,6 @@ import ( "errors" "fmt" "github.com/QuesmaOrg/quesma/platform/telemetry/headers" - "github.com/goccy/go-json" - "log" "net/http" "net/url" "strconv" @@ -24,45 +22,6 @@ type LogSender struct { httpClient *http.Client } -func (logSender *LogSender) cutMessage(msg []byte) ([]byte, bool) { - - var msgMap map[string]any - if err := json.Unmarshal(msg, &msgMap); err != nil { - - return nil, false // if not a valid JSON, return as is - } - - message, ok := msgMap["message"].(string) - - if !ok { - return nil, false // if "message" key is not present or not a string, return as is - } - - cutMark := "..." - newLine := "\n" - charToCut := len(message) + len(cutMark) + len(newLine) - cap(logSender.LogBuffer) - - log.Println("XXXX LogSender: cutting message to fit buffer, charToCut:", charToCut, "message length:", len(message), "buffer capacity:", cap(logSender.LogBuffer)) - - // cutting the message will not help, let's drop it - if charToCut < 0 { - return nil, false // if buffer has enough space, return original message - } - - if charToCut < len(message) { - msgCut := message[:len(message)-charToCut] - msgMap["message"] = msgCut + cutMark - } else { - msgMap["message"] = message - } - - trimmedMsg, err := json.Marshal(msgMap) - if err != nil { - return nil, false // if marshalling fails, return as is - } - return append(trimmedMsg, newLine...), ok // append newline to maintain log format -} - func (logSender *LogSender) EatLogMessage(msg []byte) struct { bufferLengthCondition bool timeCondition bool @@ -80,15 +39,12 @@ func (logSender *LogSender) EatLogMessage(msg []byte) struct { addedBefore := false if !bufferLengthCondition && len(logSender.LogBuffer) == 0 { // msg longer than buffer, let's cut it - // we can the message part - trimmedMsg, ok := logSender.cutMessage(msg) - if ok { - log.Println("XXXX LogSender: message was too long, cutting it to fit buffer") - logSender.LogBuffer = append(logSender.LogBuffer, trimmedMsg...) - addedBefore = true - } else { - log.Println("XXXX LogSender: message was too long, but could not cut it, dropping it") - } + // Keep it simple. Drop the message if it is too long. + // + // We were cutting the message before, but it leads to broken JSONs. These JSONs where dropped by the collector + // afterward. + // + addedBefore = true } else if len(logSender.LogBuffer)+len(msg) <= cap(logSender.LogBuffer) { // still fits in buffer logSender.LogBuffer = append(logSender.LogBuffer, msg...) From 8477647b575100982baea054856804325dc0975c Mon Sep 17 00:00:00 2001 From: Rafal Strzalinski Date: Thu, 26 Jun 2025 13:27:37 +0200 Subject: [PATCH 3/3] Fixed test --- platform/logger/log_sender_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/platform/logger/log_sender_test.go b/platform/logger/log_sender_test.go index e93a34df4..20ecab243 100644 --- a/platform/logger/log_sender_test.go +++ b/platform/logger/log_sender_test.go @@ -85,7 +85,7 @@ func TestLogSenderSmallBuffer(t *testing.T) { const URL = "http://localhost:8091" const LOG_MESSAGE = "log message" barrier := &sync.WaitGroup{} - barrier.Add(ITERATIONS) + barrier.Add(0) // all messages will be dropped handler := &Handler{barrier: barrier} go startHttpServer(handler, ":8091") @@ -98,7 +98,7 @@ func TestLogSenderSmallBuffer(t *testing.T) { assert.Equal(t, true, result.timeCondition) } barrier.Wait() - assert.Equal(t, int(handler.counter.Load()), BUFFER_SIZE*ITERATIONS) + assert.Equal(t, int(handler.counter.Load()), 0) } func TestLogSenderSmallElapsedTime(t *testing.T) {