diff --git a/pkg/engine/internal/executor/column.go b/pkg/engine/internal/executor/column.go index b3b66381ac199..b5c980c8edd37 100644 --- a/pkg/engine/internal/executor/column.go +++ b/pkg/engine/internal/executor/column.go @@ -9,6 +9,7 @@ import ( "github.com/apache/arrow-go/v18/arrow/memory" "github.com/grafana/loki/v3/pkg/engine/internal/types" + "github.com/grafana/loki/v3/pkg/logql/log" ) func NewScalar(value types.Literal, rows int) arrow.Array { @@ -54,16 +55,32 @@ func NewScalar(value types.Literal, rows int) arrow.Array { } case *array.ListBuilder: //TODO(twhitney): currently only supporting string list, but we can add more types here as we need them - value, ok := value.Any().([]string) + v, ok := value.Any().([]string) if !ok { - panic(fmt.Errorf("unsupported list literal type: %T", value)) - } - - valueBuilder := builder.ValueBuilder().(*array.StringBuilder) - for range rows { - builder.Append(true) - for _, val := range value { - valueBuilder.Append(val) + v, ok := value.Any().([]log.LabelFmt) + if !ok { + panic(fmt.Errorf("unsupported list literal type: %T", v)) + } + valueBuilder := builder.ValueBuilder().(*array.StructBuilder) + for range rows { + builder.Append(true) + for _, val := range v { + nameBuilder := valueBuilder.FieldBuilder(0).(*array.StringBuilder) + nameBuilder.Append(val.Name) + valBuilder := valueBuilder.FieldBuilder(1).(*array.StringBuilder) + valBuilder.Append(val.Value) + renameBuilder := valueBuilder.FieldBuilder(2).(*array.BooleanBuilder) + renameBuilder.Append(val.Rename) + valueBuilder.Append(true) + } + } + } else { + valueBuilder := builder.ValueBuilder().(*array.StringBuilder) + for range rows { + builder.Append(true) + for _, val := range v { + valueBuilder.Append(val) + } } } } diff --git a/pkg/engine/internal/executor/expressions.go b/pkg/engine/internal/executor/expressions.go index abddc243eca32..c2af65fb235b3 100644 --- a/pkg/engine/internal/executor/expressions.go +++ b/pkg/engine/internal/executor/expressions.go @@ -136,9 +136,9 @@ func (e expressionEvaluator) eval(expr physical.Expression, input arrow.RecordBa fn, err := variadicFunctions.GetForSignature(expr.Op) if err != nil { - return nil, fmt.Errorf("failed to lookup unary function: %w", err) + return nil, fmt.Errorf("failed to lookup variadic function: %w", err) } - return fn.Evaluate(args...) + return fn.Evaluate(input, args...) } return nil, fmt.Errorf("unknown expression: %v", expr) diff --git a/pkg/engine/internal/executor/functions.go b/pkg/engine/internal/executor/functions.go index 7b666496fd784..4ce1ed93a3c8b 100644 --- a/pkg/engine/internal/executor/functions.go +++ b/pkg/engine/internal/executor/functions.go @@ -138,6 +138,8 @@ func init() { // Parse functions variadicFunctions.register(types.VariadicOpParseLogfmt, parseFn(types.VariadicOpParseLogfmt)) variadicFunctions.register(types.VariadicOpParseJSON, parseFn(types.VariadicOpParseJSON)) + variadicFunctions.register(types.VariadicOpParseLabelfmt, parseFn(types.VariadicOpParseLabelfmt)) + variadicFunctions.register(types.VariadicOpParseLinefmt, parseFn(types.VariadicOpParseLinefmt)) } type UnaryFunctionRegistry interface { @@ -379,13 +381,13 @@ type VariadicFunctionRegistry interface { } type VariadicFunction interface { - Evaluate(args ...arrow.Array) (arrow.Array, error) + Evaluate(input arrow.RecordBatch, args ...arrow.Array) (arrow.Array, error) } -type VariadicFunctionFunc func(args ...arrow.Array) (arrow.Array, error) +type VariadicFunctionFunc func(input arrow.RecordBatch, args ...arrow.Array) (arrow.Array, error) -func (f VariadicFunctionFunc) Evaluate(args ...arrow.Array) (arrow.Array, error) { - return f(args...) +func (f VariadicFunctionFunc) Evaluate(input arrow.RecordBatch, args ...arrow.Array) (arrow.Array, error) { + return f(input, args...) } type variadicFuncReg struct { diff --git a/pkg/engine/internal/executor/parse.go b/pkg/engine/internal/executor/parse.go index e054c5569496f..8c34e4e4e2d16 100644 --- a/pkg/engine/internal/executor/parse.go +++ b/pkg/engine/internal/executor/parse.go @@ -3,6 +3,7 @@ package executor import ( "fmt" "sort" + "strconv" "unsafe" "github.com/apache/arrow-go/v18/arrow" @@ -12,22 +13,48 @@ import ( "github.com/grafana/loki/v3/pkg/engine/internal/semconv" "github.com/grafana/loki/v3/pkg/engine/internal/types" "github.com/grafana/loki/v3/pkg/engine/internal/util" + "github.com/grafana/loki/v3/pkg/logql/log" ) func parseFn(op types.VariadicOp) VariadicFunction { - return VariadicFunctionFunc(func(args ...arrow.Array) (arrow.Array, error) { - sourceCol, requestedKeys, strict, keepEmpty, err := extractParseFnParameters(args) - if err != nil { - panic(err) - } + return VariadicFunctionFunc(func(input arrow.RecordBatch, args ...arrow.Array) (arrow.Array, error) { + var ( + sourceCol *array.String + requestedKeys []string + strict bool + keepEmpty bool + lineFmtTemplate string + labelFmts []log.LabelFmt + err error + ) var headers []string var parsedColumns []arrow.Array switch op { case types.VariadicOpParseLogfmt: - headers, parsedColumns = buildLogfmtColumns(sourceCol, requestedKeys, strict, keepEmpty) + sourceCol, requestedKeys, strict, keepEmpty, err = extractParseFnParameters(args) + if err != nil { + panic(err) + } + headers, parsedColumns = buildLogfmtColumns(input, sourceCol, requestedKeys, strict, keepEmpty) case types.VariadicOpParseJSON: - headers, parsedColumns = buildJSONColumns(sourceCol, requestedKeys) + sourceCol, requestedKeys, _, _, err = extractParseFnParameters(args) + if err != nil { + panic(err) + } + headers, parsedColumns = buildJSONColumns(input, sourceCol, requestedKeys) + case types.VariadicOpParseLinefmt: + sourceCol, _, lineFmtTemplate, err = extractLineFmtParameters(args) + if err != nil { + panic(err) + } + headers, parsedColumns = buildLinefmtColumns(input, sourceCol, lineFmtTemplate) + case types.VariadicOpParseLabelfmt: + sourceCol, _, labelFmts, err = extractLabelFmtParameters(args) + if err != nil { + panic(err) + } + headers, parsedColumns = buildLabelfmtColumns(input, sourceCol, labelFmts) default: return nil, fmt.Errorf("unsupported parser kind: %v", op) } @@ -36,9 +63,15 @@ func parseFn(op types.VariadicOp) VariadicFunction { newFields := make([]arrow.Field, 0, len(headers)) for _, header := range headers { ct := types.ColumnTypeParsed + if op == types.VariadicOpParseLabelfmt { + ct = types.ColumnTypeLabel + } if header == semconv.ColumnIdentError.ShortName() || header == semconv.ColumnIdentErrorDetails.ShortName() { ct = types.ColumnTypeGenerated } + if header == semconv.ColumnIdentMessage.ShortName() { + ct = types.ColumnTypeBuiltin + } ident := semconv.NewIdentifier(header, ct, types.Loki.String) newFields = append(newFields, semconv.FieldFromIdent(ident, true)) } @@ -51,6 +84,85 @@ func parseFn(op types.VariadicOp) VariadicFunction { }) } +func extractLineFmtParameters(args []arrow.Array) (*array.String, []string, string, error) { + if len(args) != 3 { + return nil, nil, "", fmt.Errorf("parse function expected 3 arguments, got %d", len(args)) + } + var sourceColArr, templateArr arrow.Array + sourceColArr = args[0] + // requestedKeysArr = args[1] not needed + templateArr = args[2] + + if sourceColArr == nil { + return nil, nil, "", fmt.Errorf("parse function arguments did not include a source ColumnVector to parse") + } + + sourceCol, ok := sourceColArr.(*array.String) + if !ok { + return nil, nil, "", fmt.Errorf("parse can only operate on string column types, got %T", sourceColArr) + } + + stringArr, ok := templateArr.(*array.String) + if !ok { + return nil, nil, "", fmt.Errorf("template must be a string, got %T", templateArr) + } + + var template string + templateIdx := 0 + if (stringArr != nil) && (stringArr.Len() > 0) { + template = stringArr.Value(templateIdx) + } + return sourceCol, nil, template, nil + +} + +func extractLabelFmtParameters(args []arrow.Array) (*array.String, []string, []log.LabelFmt, error) { + if len(args) != 3 { + return nil, nil, nil, fmt.Errorf("parse function expected 3 arguments, got %d", len(args)) + } + var sourceColArr, labelFmtArray arrow.Array + sourceColArr = args[0] + // requestedKeysArr = args[1] not needed + labelFmtArray = args[2] + + if sourceColArr == nil { + return nil, nil, nil, fmt.Errorf("parse function arguments did not include a source ColumnVector to parse") + } + + sourceCol, ok := sourceColArr.(*array.String) + if !ok { + return nil, nil, nil, fmt.Errorf("parse can only operate on string column types, got %T", sourceColArr) + } + + listArr, ok := labelFmtArray.(*array.List) + if !ok { + return nil, nil, nil, fmt.Errorf("labelfmts array must be of type struct, got %T", labelFmtArray) + } + var labelFmts []log.LabelFmt + listValues := listArr.ListValues() + switch listValues := listValues.(type) { + case *array.Struct: + if sourceCol.Len() == 0 { + return sourceCol, nil, []log.LabelFmt{}, nil + } + // listValues will repeat one copy for each line of input; we want to only grab one copy + for i := 0; i < (listValues.Len() / sourceCol.Len()); i++ { + name := listValues.Field(0).ValueStr(i) + val := listValues.Field(1).ValueStr(i) + rename, err := strconv.ParseBool(listValues.Field(2).ValueStr(i)) + if err != nil { + return nil, nil, nil, fmt.Errorf("wrong format for labelFmt value rename: error %v", err) + } + labelFmts = append(labelFmts, log.LabelFmt{Name: name, Value: val, Rename: rename}) + } + default: + return nil, nil, nil, fmt.Errorf("unknown labelfmt type %T; expected *array.Struct", listValues) + } + + return sourceCol, nil, labelFmts, nil + +} + func extractParseFnParameters(args []arrow.Array) (*array.String, []string, bool, bool, error) { // Valid signatures: // parse(sourceColVec, requestedKeys, strict, keepEmpty) @@ -113,13 +225,13 @@ func extractParseFnParameters(args []arrow.Array) (*array.String, []string, bool } // parseFunc represents a function that parses a single line and returns key-value pairs -type parseFunc func(line string) (map[string]string, error) +type parseFunc func(recordRow arrow.RecordBatch, line string) (map[string]string, error) // buildColumns builds Arrow columns from input lines using the provided parser // Returns the column headers, the Arrow columns, and any error -func buildColumns(input *array.String, _ []string, parseFunc parseFunc, errorType string) ([]string, []arrow.Array) { +func buildColumns(input arrow.RecordBatch, sourceCol *array.String, _ []string, parseFunc parseFunc, errorType string) ([]string, []arrow.Array) { columnBuilders := make(map[string]*array.StringBuilder) - columnOrder := parseLines(input, columnBuilders, parseFunc, errorType) + columnOrder := parseLines(input, sourceCol, columnBuilders, parseFunc, errorType) // Build final arrays columns := make([]arrow.Array, 0, len(columnOrder)) @@ -135,14 +247,15 @@ func buildColumns(input *array.String, _ []string, parseFunc parseFunc, errorTyp } // parseLines discovers columns dynamically as lines are parsed -func parseLines(input *array.String, columnBuilders map[string]*array.StringBuilder, parseFunc parseFunc, errorType string) []string { +func parseLines(input arrow.RecordBatch, sourceCol *array.String, columnBuilders map[string]*array.StringBuilder, parseFunc parseFunc, errorType string) []string { columnOrder := []string{} var errorBuilder, errorDetailsBuilder *array.StringBuilder hasErrorColumns := false - for i := 0; i < input.Len(); i++ { - line := input.Value(i) - parsed, err := parseFunc(line) + for i := 0; i < sourceCol.Len(); i++ { + line := sourceCol.Value(i) + // pass the corresponding row of input as well + parsed, err := parseFunc(input.NewSlice(int64(i), int64(i+1)), line) // Handle error columns if err != nil { diff --git a/pkg/engine/internal/executor/parse_json.go b/pkg/engine/internal/executor/parse_json.go index 3a24d6f47fed0..4c0808d24f907 100644 --- a/pkg/engine/internal/executor/parse_json.go +++ b/pkg/engine/internal/executor/parse_json.go @@ -31,11 +31,11 @@ var ( } ) -func buildJSONColumns(input *array.String, requestedKeys []string) ([]string, []arrow.Array) { - parseFunc := func(line string) (map[string]string, error) { +func buildJSONColumns(input arrow.RecordBatch, sourceCol *array.String, requestedKeys []string) ([]string, []arrow.Array) { + parseFunc := func(_ arrow.RecordBatch, line string) (map[string]string, error) { return parseJSONLine(line, requestedKeys) } - return buildColumns(input, requestedKeys, parseFunc, types.JSONParserErrorType) + return buildColumns(input, sourceCol, requestedKeys, parseFunc, types.JSONParserErrorType) } // parseJSONLine parses a single JSON line and extracts key-value pairs diff --git a/pkg/engine/internal/executor/parse_labelfmt.go b/pkg/engine/internal/executor/parse_labelfmt.go new file mode 100644 index 0000000000000..264b9e4cc31cf --- /dev/null +++ b/pkg/engine/internal/executor/parse_labelfmt.go @@ -0,0 +1,72 @@ +package executor + +import ( + "fmt" + "time" + + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/array" + "github.com/prometheus/prometheus/model/labels" + + "github.com/grafana/loki/v3/pkg/engine/internal/semconv" + "github.com/grafana/loki/v3/pkg/engine/internal/types" + "github.com/grafana/loki/v3/pkg/logql/log" +) + +func buildLabelfmtColumns(input arrow.RecordBatch, sourceCol *array.String, labelFmts []log.LabelFmt) ([]string, []arrow.Array) { + parseFunc := func(row arrow.RecordBatch, line string) (map[string]string, error) { + return tokenizeLabelfmt(row, line, labelFmts) + } + return buildColumns(input, sourceCol, nil, parseFunc, types.LabelfmtParserErrorType) +} + +// tokenizeLabelfmt parses labelfmt input using the standard decoder +// Returns a map of key-value pairs with first-wins semantics for duplicates +func tokenizeLabelfmt(input arrow.RecordBatch, line string, labelFmts []log.LabelFmt) (map[string]string, error) { + decoder, err := log.NewLabelsFormatter(labelFmts) + if err != nil { + return nil, fmt.Errorf("unable to create label formatter with formats %v", labelFmts) + } + lbls := buildLabelsFromInput(input) + var builder = log.NewBaseLabelsBuilder().ForLabels(lbls, labels.StableHash(lbls)) + builder.Reset() + builder.Add(log.StructuredMetadataLabel, buildLabelsFromInput(input)) + + var timestampIdx = -1 + for i := 0; i < len(input.Columns()); i++ { + if input.ColumnName(i) == types.ColumnFullNameTimestamp { + timestampIdx = i + break + } + } + if timestampIdx < 0 { + return map[string]string{}, fmt.Errorf("unable to find timestamp column in inputs") + } + ts, err := time.Parse("2006-01-02T15:04:05.999999999Z", input.Column(timestampIdx).ValueStr(0)) + if err != nil { + panic(fmt.Sprintf("Unable to convert timestamp %v", input.Column(timestampIdx).ValueStr(0))) + } + + decoder.Process(ts.UnixNano(), unsafeBytes(line), builder) + result := builder.LabelsResult().Labels().Map() + // result includes every single label from the input, not just the ones from labelFmts. + // Remove the labels we don't care about and return only the new/adjusted labels. + var relevantLabels = map[string]bool{} + for _, label := range labelFmts { + relevantLabels[label.Name] = true + } + for labelName := range result { + if _, ok := relevantLabels[labelName]; !ok { + delete(result, labelName) + } + } + return result, nil +} + +func buildLabelsFromInput(input arrow.RecordBatch) labels.Labels { + var labelList []labels.Label + for i := 0; i < int(input.NumCols()); i++ { + labelList = append(labelList, labels.Label{Name: semconv.MustParseFQN(input.ColumnName(i)).ColumnRef().Column, Value: input.Column(i).ValueStr(0)}) + } + return labels.New(labelList...) +} diff --git a/pkg/engine/internal/executor/parse_labelfmt_test.go b/pkg/engine/internal/executor/parse_labelfmt_test.go new file mode 100644 index 0000000000000..025fb73283fcc --- /dev/null +++ b/pkg/engine/internal/executor/parse_labelfmt_test.go @@ -0,0 +1,94 @@ +package executor + +import ( + "strconv" + "testing" + "testing/synctest" + "time" + + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/array" + "github.com/apache/arrow-go/v18/arrow/memory" + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/v3/pkg/engine/internal/semconv" + "github.com/grafana/loki/v3/pkg/engine/internal/types" + "github.com/grafana/loki/v3/pkg/logql/log" +) + +func TestTokenizeLabelFmt(t *testing.T) { + timestamp := time.Now().In(time.UTC) + schema := arrow.NewSchema( + []arrow.Field{ + semconv.FieldFromIdent(semconv.ColumnIdentMessage, false), + semconv.FieldFromIdent(semconv.ColumnIdentTimestamp, false), + semconv.FieldFromIdent(semconv.NewIdentifier("namespace", types.ColumnTypeMetadata, types.Loki.String), false), + }, + nil, // No metadata + ) + tests := []struct { + name string + line string + timeVal arrow.Timestamp + namespace string + fmts []log.LabelFmt + want map[string]string + }{ + { + "simple rename", + "this is my test line of logs", + arrow.Timestamp(timestamp.UnixNano()), + "dev", + []log.LabelFmt{{Name: "foo", Value: "namespace", Rename: true}}, + map[string]string{"foo": "dev"}, + }, + { + "new label", + "this is my test line of logs", + arrow.Timestamp(timestamp.UnixNano()), + "dev", + []log.LabelFmt{{Name: "logTimeNanos", Value: "{{ __timestamp__ | unixEpochNanos }}", Rename: false}}, + map[string]string{"logTimeNanos": strconv.FormatInt(timestamp.UTC().UnixNano(), 10)}, + }, + { + "rename and label", + "this is my test line of logs", + arrow.Timestamp(timestamp.Add(10 * time.Second).UTC().UnixNano()), + "dev", + []log.LabelFmt{{Name: "foo", Value: "namespace", Rename: true}, {Name: "logTimeNanos", Value: "{{ __timestamp__ | unixEpochNanos }}", Rename: false}}, + map[string]string{"foo": "dev", "logTimeNanos": strconv.FormatInt(timestamp.Add(10*time.Second).UTC().UnixNano(), 10)}, + }, + } + + for _, tt := range tests { + synctest.Test(t, func(t *testing.T) { + // Create builders for each column + logBuilder := array.NewStringBuilder(memory.DefaultAllocator) + tsBuilder := array.NewTimestampBuilder(memory.DefaultAllocator, &arrow.TimestampType{Unit: arrow.Nanosecond, TimeZone: "UTC"}) + namespaceBuilder := array.NewStringBuilder(memory.DefaultAllocator) + + // Append data to the builders + logs := make([]string, 1) + ts := make([]arrow.Timestamp, 1) + namespaces := make([]string, 1) + + logs[0] = tt.line + ts[0] = tt.timeVal + namespaces[0] = tt.namespace + + tsBuilder.AppendValues(ts, nil) + logBuilder.AppendValues(logs, nil) + namespaceBuilder.AppendValues(namespaces, nil) + + // Build the arrays + logArray := logBuilder.NewArray() + tsArray := tsBuilder.NewArray() + namespaceArray := namespaceBuilder.NewArray() + columns := []arrow.Array{logArray, tsArray, namespaceArray} + recordBatch := array.NewRecordBatch(schema, columns, 1) + result, err := tokenizeLabelfmt(recordBatch, tt.line, tt.fmts) + require.NoError(t, err) + require.Equal(t, tt.want, result) + }) + } +} diff --git a/pkg/engine/internal/executor/parse_linefmt.go b/pkg/engine/internal/executor/parse_linefmt.go new file mode 100644 index 0000000000000..4e6af68b339d1 --- /dev/null +++ b/pkg/engine/internal/executor/parse_linefmt.go @@ -0,0 +1,140 @@ +package executor + +import ( + "bytes" + "fmt" + "text/template" + "text/template/parse" + "time" + + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/array" + + "github.com/grafana/loki/v3/pkg/engine/internal/semconv" + "github.com/grafana/loki/v3/pkg/engine/internal/types" + "github.com/grafana/loki/v3/pkg/logql/log" +) + +// parseFunc will be called once for each line, so we need to know which line of `input` corresponds to `line` +// also, sometimes input is a batch of 0 lines but we have a "line" string anyway? +func buildLinefmtColumns(input arrow.RecordBatch, sourceCol *array.String, lineFmt string) ([]string, []arrow.Array) { + parseFunc := func(row arrow.RecordBatch, line string) (map[string]string, error) { + return tokenizeLinefmt(row, line, lineFmt) + } + return buildColumns(input, sourceCol, nil, parseFunc, types.LabelfmtParserErrorType) +} + +// tokenizeLinefmt parses linefmt input using the standard decoder +// Returns a map of key-value pairs with first-wins semantics for duplicates +func tokenizeLinefmt(input arrow.RecordBatch, line string, lineFmt string) (map[string]string, error) { + result := make(map[string]string) + + formatter, err := NewFormatter(lineFmt) + if err != nil { + return nil, fmt.Errorf("unable to create line formatter with template %v", lineFmt) + } + if _, ok := formatter.Process(line, input, result); !ok { + return nil, fmt.Errorf("unable to process line %v", line) + } + return result, nil +} + +type LineFormatter struct { + *template.Template + buf *bytes.Buffer + + currentLine []byte + currentTs int64 + simpleKey string +} + +// NewFormatter creates a new log line formatter from a given text template. +func NewFormatter(tmpl string) (*LineFormatter, error) { + lf := &LineFormatter{ + buf: bytes.NewBuffer(make([]byte, 4096)), + } + + functions := log.AddLineAndTimestampFunctions(func() string { + return unsafeString(lf.currentLine) + }, func() int64 { + return lf.currentTs + }) + + t, err := template.New("line").Option("missingkey=zero").Funcs(functions).Parse(tmpl) + if err != nil { + return nil, fmt.Errorf("invalid line template: %w", err) + } + lf.Template = t + // determine if the template is a simple key substitution, e.g. line_format `{{.message}}` + // if it is, save the key name and we can use it later to directly copy the string + // bytes of the value to avoid copying and allocating a new string. + if len(t.Root.Nodes) == 1 && t.Root.Nodes[0].Type() == parse.NodeAction { + actionNode := t.Root.Nodes[0].(*parse.ActionNode) + if len(actionNode.Pipe.Cmds) == 1 && len(actionNode.Pipe.Cmds[0].Args) == 1 { + if fieldNode, ok := actionNode.Pipe.Cmds[0].Args[0].(*parse.FieldNode); ok && len(fieldNode.Ident) == 1 { + lf.simpleKey = fieldNode.Ident[0] + } + } + } + + return lf, nil +} + +func (lf *LineFormatter) Process(line string, input arrow.RecordBatch, result map[string]string) (string, bool) { + var messageIdx = -1 + for i := 0; i < len(input.Columns()); i++ { + colIdent := semconv.MustParseFQN(input.ColumnName(i)).ColumnRef().Column + if colIdent == "message" { + messageIdx = i + break + } + } + if messageIdx < 0 { + return "", false + } + if lf.simpleKey != "" { + var simpleKeyIdx = -1 + for i := 0; i < len(input.Columns()); i++ { + colIdent := semconv.MustParseFQN(input.ColumnName(i)).ColumnRef().Column + if lf.simpleKey == colIdent { + simpleKeyIdx = i + break + } + } + if simpleKeyIdx < 0 { + return "", true + } + result[types.ColumnNameBuiltinMessage] = input.Column(simpleKeyIdx).ValueStr(0) + return input.Column(simpleKeyIdx).ValueStr(0), true + } + var timestampIdx = -1 + for i := 0; i < len(input.Columns()); i++ { + if input.ColumnName(i) == types.ColumnFullNameTimestamp { + timestampIdx = i + break + } + } + if timestampIdx == -1 { + panic("Unable to find timestamp column in inputs") + } + lf.buf.Reset() + lf.currentLine = unsafeBytes(line) + ts, err := time.Parse("2006-01-02T15:04:05.999999999Z", input.Column(timestampIdx).ValueStr(0)) + if err != nil { + panic(fmt.Sprintf("Unable to convert timestamp %v", input.Column(timestampIdx).ValueStr(0))) + } + lf.currentTs = ts.UnixNano() + + m := make(map[string]string) + for i := 0; i < len(input.Columns()); i++ { + m[semconv.MustParseFQN(input.ColumnName(i)).ColumnRef().Column] = input.Column(i).ValueStr(0) + } + + if err := lf.Execute(lf.buf, m); err != nil { + result[types.ColumnNameError] = "TemplateFormatErr " + err.Error() + return line, true + } + result[types.ColumnNameBuiltinMessage] = lf.buf.String() + + return lf.buf.String(), true +} diff --git a/pkg/engine/internal/executor/parse_linefmt_test.go b/pkg/engine/internal/executor/parse_linefmt_test.go new file mode 100644 index 0000000000000..d712928c20f0f --- /dev/null +++ b/pkg/engine/internal/executor/parse_linefmt_test.go @@ -0,0 +1,96 @@ +package executor + +import ( + "testing" + "time" + + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/array" + "github.com/apache/arrow-go/v18/arrow/memory" + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/v3/pkg/engine/internal/semconv" + "github.com/grafana/loki/v3/pkg/engine/internal/types" +) + +func TestLinefmtParser_Process(t *testing.T) { + timestamp := time.Now().In(time.UTC).Add(time.Duration(1) * time.Second) + schema := arrow.NewSchema( + []arrow.Field{ + semconv.FieldFromIdent(semconv.ColumnIdentMessage, false), + semconv.FieldFromIdent(semconv.ColumnIdentTimestamp, false), + semconv.FieldFromIdent(semconv.NewIdentifier("namespace", types.ColumnTypeMetadata, types.Loki.String), false), + }, + nil, // No metadata + ) + tests := []struct { + name string + line string + timeVal arrow.Timestamp + namespace string + lineFmt string + want string + }{ + { + "simple replacement", + "this is my test line of logs", + arrow.Timestamp(timestamp.UnixNano()), + "dev", + "{{.namespace}}", + "dev", + }, + { + "replacement plus addition", + "this is my test line of logs", + arrow.Timestamp(timestamp.UnixNano()), + "dev", + "{{.namespace}} foo", + "dev foo", + }, + { + "timestamp", + "this is my test line of logs", + arrow.Timestamp(timestamp.UnixNano()), + "dev", + "{{.timestamp}} foo", + timestamp.In(time.UTC).Format("2006-01-02T15:04:05.999999999Z") + " foo", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Create builders for each column + logBuilder := array.NewStringBuilder(memory.DefaultAllocator) + tsBuilder := array.NewTimestampBuilder(memory.DefaultAllocator, &arrow.TimestampType{Unit: arrow.Nanosecond, TimeZone: "UTC"}) + namespaceBuilder := array.NewStringBuilder(memory.DefaultAllocator) + + // Append data to the builders + logs := make([]string, 1) + ts := make([]arrow.Timestamp, 1) + namespaces := make([]string, 1) + + logs[0] = tt.line + ts[0] = tt.timeVal + namespaces[0] = tt.namespace + + tsBuilder.AppendValues(ts, nil) + logBuilder.AppendValues(logs, nil) + namespaceBuilder.AppendValues(namespaces, nil) + + // Build the arrays + logArray := logBuilder.NewArray() + tsArray := tsBuilder.NewArray() + namespaceArray := namespaceBuilder.NewArray() + columns := []arrow.Array{logArray, tsArray, namespaceArray} + recordBatch := array.NewRecordBatch(schema, columns, 1) + + parser, err := NewFormatter(tt.lineFmt) + require.NoError(t, err) + var result = map[string]string{} + output, _ := parser.Process(tt.line, recordBatch, result) + require.NoError(t, err) + require.Equal(t, tt.want, output) + require.Equal(t, tt.want, result["message"]) + }) + } +} diff --git a/pkg/engine/internal/executor/parse_logfmt.go b/pkg/engine/internal/executor/parse_logfmt.go index dc67f287e695c..dc0a496e49800 100644 --- a/pkg/engine/internal/executor/parse_logfmt.go +++ b/pkg/engine/internal/executor/parse_logfmt.go @@ -8,11 +8,11 @@ import ( "github.com/grafana/loki/v3/pkg/logql/log/logfmt" ) -func buildLogfmtColumns(input *array.String, requestedKeys []string, strict bool, keepEmpty bool) ([]string, []arrow.Array) { - parseFunc := func(line string) (map[string]string, error) { +func buildLogfmtColumns(input arrow.RecordBatch, sourceCol *array.String, requestedKeys []string, strict bool, keepEmpty bool) ([]string, []arrow.Array) { + parseFunc := func(_ arrow.RecordBatch, line string) (map[string]string, error) { return tokenizeLogfmt(line, requestedKeys, strict, keepEmpty) } - return buildColumns(input, requestedKeys, parseFunc, types.LogfmtParserErrorType) + return buildColumns(input, sourceCol, requestedKeys, parseFunc, types.LogfmtParserErrorType) } // tokenizeLogfmt parses logfmt input using the standard decoder diff --git a/pkg/engine/internal/executor/project.go b/pkg/engine/internal/executor/project.go index ef2c22d1ef621..203469c05123a 100644 --- a/pkg/engine/internal/executor/project.go +++ b/pkg/engine/internal/executor/project.go @@ -75,7 +75,7 @@ func NewProjectPipeline(input Pipeline, proj *physical.Projection, evaluator *ex // Create EXPAND projection pipeline: // Keep all columns and expand the ones referenced in proj.Expressions. - // TODO: as implemented, epanding and keeping/dropping cannot happen in the same projection. Is this desired? + // TODO: as implemented, expanding and keeping/dropping cannot happen in the same projection. Is this desired? if proj.All && proj.Expand && len(expandExprs) > 0 { return newExpandPipeline(expandExprs[0], evaluator, input, region) } @@ -157,6 +157,10 @@ func newExpandPipeline(expr physical.Expression, evaluator *expressionEvaluator, return nil, fmt.Errorf("unexpected type returned from evaluation, expected *arrow.StructType, got %T", arrCasted.DataType()) } for i := range arrCasted.NumField() { + // If a field is included in the newly returned set of values, overwrite it by removing from the list of existing fields + if idx := idxOf(outputFields, structSchema.Field(i)); idx >= 0 { + outputFields, outputCols = removeIdx(outputFields, outputCols, idx) + } outputCols = append(outputCols, arrCasted.Field(i)) outputFields = append(outputFields, structSchema.Field(i)) } @@ -172,3 +176,18 @@ func newExpandPipeline(expr physical.Expression, evaluator *expressionEvaluator, return array.NewRecordBatch(outputSchema, outputCols, batch.NumRows()), nil }, region, input), nil } + +func idxOf(set []arrow.Field, entry arrow.Field) int { + for i := 0; i < len(set); i++ { + if set[i].Name == entry.Name && set[i].Type == entry.Type { + return i + } + } + return -1 +} + +func removeIdx(fields []arrow.Field, values []arrow.Array, idx int) ([]arrow.Field, []arrow.Array) { + outFields := append(fields[:idx], fields[idx+1:]...) + outValues := append(values[:idx], values[idx+1:]...) + return outFields, outValues +} diff --git a/pkg/engine/internal/planner/logical/builder.go b/pkg/engine/internal/planner/logical/builder.go index 5ce753dca493f..a2aeda568d54c 100644 --- a/pkg/engine/internal/planner/logical/builder.go +++ b/pkg/engine/internal/planner/logical/builder.go @@ -57,6 +57,23 @@ func (b *Builder) Parse(op types.VariadicOp, strict bool, keepEmpty bool) *Build return b.ProjectExpand(val) } +// Format applies a [Projection] operation to the Builder for a line_format or label_format instruction. +func (b *Builder) Format(op types.VariadicOp, Template Value) *Builder { + val := &FunctionOp{ + Op: op, + Values: []Value{ + // source column + &ColumnRef{ + Ref: semconv.ColumnIdentMessage.ColumnRef(), + }, + // nil for requested keys (to be filled in by projection pushdown optimizer) + NewLiteral([]string{}), + Template, + }, + } + return b.ProjectExpand(val) +} + // Cast applies an [Projection] operation, with an [UnaryOp] cast operation, to the Builder. func (b *Builder) Cast(identifier string, op types.UnaryOp) *Builder { val := &UnaryOp{ diff --git a/pkg/engine/internal/planner/logical/planner.go b/pkg/engine/internal/planner/logical/planner.go index 8ae35c2aa12d2..eb0d4bd15ab39 100644 --- a/pkg/engine/internal/planner/logical/planner.go +++ b/pkg/engine/internal/planner/logical/planner.go @@ -71,6 +71,10 @@ func buildPlanForLogQuery( logfmtStrict bool logfmtKeepEmpty bool hasJSONParser bool + hasLinefmtParser bool + hasLabelfmtParser bool + linefmtTemplate Value + labelfmtTemplates Value ) // TODO(chaudum): Implement a Walk function that can return an error @@ -120,11 +124,13 @@ func buildPlanForLogQuery( err = errUnimplemented return false // do not traverse children case *syntax.LineFmtExpr: - err = unimplementedFeature("line_format") - return false // do not traverse children + hasLinefmtParser = true + linefmtTemplate = NewLiteral(e.Value) + return true case *syntax.LabelFmtExpr: - err = unimplementedFeature("label_format") - return false // do not traverse children + hasLabelfmtParser = true + labelfmtTemplates = NewLiteral(e.Formats) + return true case *syntax.KeepLabelsExpr: err = unimplementedFeature("keep") return false // do not traverse children @@ -192,6 +198,16 @@ func buildPlanForLogQuery( // JSON has no parameters builder = builder.Parse(types.VariadicOpParseJSON, false, false) } + if hasLinefmtParser { + builder = builder.Format(types.VariadicOpParseLinefmt, linefmtTemplate) + } + if hasLabelfmtParser { + builder = builder.Format(types.VariadicOpParseLabelfmt, labelfmtTemplates) + replacedLabels := getReplacedLabels(labelfmtTemplates) + if len(replacedLabels) > 0 { + builder = builder.ProjectDrop(replacedLabels...) + } + } for _, value := range postParsePredicates { builder = builder.Select(value) } @@ -479,6 +495,10 @@ func convertLineFilter(filter syntax.LineFilter) Value { } } +func convertLineFormat(value string) Value { + return &UnaryOp{Op: types.UnaryOpParseLinefmt, Value: NewLiteral(value)} +} + func convertBinaryArithmeticOp(op string) types.BinaryOp { switch op { case syntax.OpTypeAdd: @@ -517,6 +537,21 @@ func convertLineMatchType(op log.LineMatchType) types.BinaryOp { } } +func getReplacedLabels(labels Value) []Value { + var replacedLabels = []Value{} + if fmtLabels, ok := labels.(*Literal).inner.(types.LabelFmtListLiteral); ok { + for _, label := range fmtLabels { + if label.Rename { + labelVal := NewColumnRef(label.Value, types.ColumnTypeLabel) + replacedLabels = append(replacedLabels, labelVal) + } + } + } else { + panic("invalid data type for label_format arguments; expected log.LabelFmt") + } + return replacedLabels +} + func timestampColumnRef() *ColumnRef { return NewColumnRef(types.ColumnNameBuiltinTimestamp, types.ColumnTypeBuiltin) } diff --git a/pkg/engine/internal/planner/logical/planner_test.go b/pkg/engine/internal/planner/logical/planner_test.go index 9ecdab5fb3ffe..b0c205df3ab59 100644 --- a/pkg/engine/internal/planner/logical/planner_test.go +++ b/pkg/engine/internal/planner/logical/planner_test.go @@ -283,9 +283,11 @@ func TestCanExecuteQuery(t *testing.T) { }, { statement: `{env="prod"} | line_format "{.cluster}"`, + expected: true, }, { statement: `{env="prod"} | label_format cluster="us"`, + expected: true, }, { statement: `{env="prod"} |= "metric.go" | retry > 2`, diff --git a/pkg/engine/internal/planner/physical/expressions.go b/pkg/engine/internal/planner/physical/expressions.go index bfcd27bafd4aa..5c56c0a48f86d 100644 --- a/pkg/engine/internal/planner/physical/expressions.go +++ b/pkg/engine/internal/planner/physical/expressions.go @@ -226,7 +226,7 @@ type VariadicExpr struct { // Op is the function operation to apply to the parameters Op types.VariadicOp - // Expressions are the parameters paaws to the function + // Expressions are the parameters passed to the function Expressions []Expression } diff --git a/pkg/engine/internal/planner/physical/plan.go b/pkg/engine/internal/planner/physical/plan.go index 4682fb797bd2c..a1c4f474a23bd 100644 --- a/pkg/engine/internal/planner/physical/plan.go +++ b/pkg/engine/internal/planner/physical/plan.go @@ -20,7 +20,6 @@ const ( NodeTypeRangeAggregation NodeTypeVectorAggregation NodeTypeMerge - NodeTypeParse NodeTypeCompat NodeTypeTopK NodeTypeParallelize @@ -46,8 +45,6 @@ func (t NodeType) String() string { return "RangeAggregation" case NodeTypeVectorAggregation: return "VectorAggregation" - case NodeTypeParse: - return "Parse" case NodeTypeCompat: return "Compat" case NodeTypeTopK: diff --git a/pkg/engine/internal/planner/physical/planner.go b/pkg/engine/internal/planner/physical/planner.go index 9ea6d1b8bf1f7..1f2803839addf 100644 --- a/pkg/engine/internal/planner/physical/planner.go +++ b/pkg/engine/internal/planner/physical/planner.go @@ -340,7 +340,8 @@ func (p *Planner) processProjection(lp *logical.Projection, ctx *Context) (Node, for i := range lp.Expressions { expressions[i] = p.convertPredicate(lp.Expressions[i]) if funcExpr, ok := lp.Expressions[i].(*logical.FunctionOp); ok { - if funcExpr.Op == types.VariadicOpParseJSON || funcExpr.Op == types.VariadicOpParseLogfmt { + if slices.Contains([]types.VariadicOp{types.VariadicOpParseJSON, types.VariadicOpParseLogfmt, + types.VariadicOpParseLinefmt, types.VariadicOpParseLabelfmt}, funcExpr.Op) { needsCompat = true } } @@ -467,7 +468,7 @@ func (p *Planner) processVectorAggregation(lp *logical.VectorAggregation, ctx *C // - acc: currently accumulated expression. // - input: a physical plan node of the only input of that expression, if any. // - inputRef: a pointer to a node in `acc` that refers the input, if any. This is for convenience of -// renaming the column refenrece without a need to search for it in `acc` expression. +// renaming the column reference without a need to search for it in `acc` expression. // - err: error func (p *Planner) collapseMathExpressions(lp logical.Value, rootNode bool, ctx *Context) (acc Expression, input Node, inputRef *ColumnExpr, err error) { switch v := lp.(type) { diff --git a/pkg/engine/internal/types/column.go b/pkg/engine/internal/types/column.go index cfd2caf143e5a..0a0f66d37d745 100644 --- a/pkg/engine/internal/types/column.go +++ b/pkg/engine/internal/types/column.go @@ -41,6 +41,10 @@ const ( MetadataKeyColumnDataType = "column_datatype" ) +const ( + ColumnFullNameTimestamp = "timestamp_ns.builtin.timestamp" +) + // Names of error columns const ( ColumnNameError = "__error__" @@ -50,6 +54,7 @@ const ( // Error types. const ( LogfmtParserErrorType = "LogfmtParserErr" + LabelfmtParserErrorType = "LabelfmtParserErr" JSONParserErrorType = "JSONParserErr" SampleExtractionErrorType = "SampleExtractionErr" ) diff --git a/pkg/engine/internal/types/literal.go b/pkg/engine/internal/types/literal.go index ac10e795b56ce..e1f44e1100bc5 100644 --- a/pkg/engine/internal/types/literal.go +++ b/pkg/engine/internal/types/literal.go @@ -10,6 +10,7 @@ import ( "github.com/dustin/go-humanize" "github.com/grafana/loki/v3/pkg/engine/internal/util" + "github.com/grafana/loki/v3/pkg/logql/log" ) type NullLiteral struct{} @@ -204,6 +205,33 @@ func (l StringListLiteral) Value() []string { return l } +type LabelFmtListLiteral []log.LabelFmt + +func (l LabelFmtListLiteral) String() string { + tmp := "[" + for _, f := range l { + tmp = tmp + "{Name:\"" + f.Name + "\", Value:\"" + f.Value + "\", Rename:" + strconv.FormatBool(f.Rename) + "}" + } + tmp += "]" + return tmp +} +func (l LabelFmtListLiteral) Any() LiteralType { + return l.Value() +} +func (l LabelFmtListLiteral) Value() []log.LabelFmt { + return l +} + +// Type implements Literal. +func (l LabelFmtListLiteral) Type() DataType { + tList := tList{ + arrowType: arrow.ListOf(arrow.StructOf([]arrow.Field{{Name: "Name", Type: arrow.BinaryTypes.String}, + {Name: "Value", Type: arrow.BinaryTypes.String}, + {Name: "Rename", Type: arrow.FixedWidthTypes.Boolean}}...)), + } + return tList +} + // Literal is holds a value of [any] typed as [DataType]. type Literal interface { fmt.Stringer @@ -221,24 +249,26 @@ type TypedLiteral[T LiteralType] interface { } var ( - _ Literal = (*NullLiteral)(nil) - _ TypedLiteral[Null] = (*NullLiteral)(nil) - _ Literal = (*BoolLiteral)(nil) - _ TypedLiteral[bool] = (*BoolLiteral)(nil) - _ Literal = (*StringLiteral)(nil) - _ TypedLiteral[string] = (*StringLiteral)(nil) - _ Literal = (*IntegerLiteral)(nil) - _ TypedLiteral[int64] = (*IntegerLiteral)(nil) - _ Literal = (*FloatLiteral)(nil) - _ TypedLiteral[float64] = (*FloatLiteral)(nil) - _ Literal = (*TimestampLiteral)(nil) - _ TypedLiteral[Timestamp] = (*TimestampLiteral)(nil) - _ Literal = (*DurationLiteral)(nil) - _ TypedLiteral[Duration] = (*DurationLiteral)(nil) - _ Literal = (*BytesLiteral)(nil) - _ TypedLiteral[Bytes] = (*BytesLiteral)(nil) - _ Literal = (*StringListLiteral)(nil) - _ TypedLiteral[[]string] = (*StringListLiteral)(nil) + _ Literal = (*NullLiteral)(nil) + _ TypedLiteral[Null] = (*NullLiteral)(nil) + _ Literal = (*BoolLiteral)(nil) + _ TypedLiteral[bool] = (*BoolLiteral)(nil) + _ Literal = (*StringLiteral)(nil) + _ TypedLiteral[string] = (*StringLiteral)(nil) + _ Literal = (*IntegerLiteral)(nil) + _ TypedLiteral[int64] = (*IntegerLiteral)(nil) + _ Literal = (*FloatLiteral)(nil) + _ TypedLiteral[float64] = (*FloatLiteral)(nil) + _ Literal = (*TimestampLiteral)(nil) + _ TypedLiteral[Timestamp] = (*TimestampLiteral)(nil) + _ Literal = (*DurationLiteral)(nil) + _ TypedLiteral[Duration] = (*DurationLiteral)(nil) + _ Literal = (*BytesLiteral)(nil) + _ TypedLiteral[Bytes] = (*BytesLiteral)(nil) + _ Literal = (*StringListLiteral)(nil) + _ TypedLiteral[[]string] = (*StringListLiteral)(nil) + _ Literal = (*LabelFmtListLiteral)(nil) + _ TypedLiteral[[]log.LabelFmt] = (*LabelFmtListLiteral)(nil) ) func NewLiteral(value any) Literal { @@ -265,9 +295,10 @@ func NewLiteral(value any) Literal { return BytesLiteral(val) case []string: return StringListLiteral(val) - default: - panic(fmt.Sprintf("invalid literal value type %T", value)) + case []log.LabelFmt: + return LabelFmtListLiteral(val) } + panic(fmt.Sprintf("invalid literal value type %T", value)) } func NewNullLiteral() NullLiteral { diff --git a/pkg/engine/internal/types/operators.go b/pkg/engine/internal/types/operators.go index ee2da848d5e20..c62380ef084c0 100644 --- a/pkg/engine/internal/types/operators.go +++ b/pkg/engine/internal/types/operators.go @@ -17,6 +17,7 @@ const ( UnaryOpCastFloat // Cast string to float value operation (unwrap). UnaryOpCastBytes // Cast string bytes to float value operation (unwrap). UnaryOpCastDuration // Cast string duration to float value operation (unwrap). + UnaryOpParseLinefmt // Parse linefmt line ) // String returns the string representation of the UnaryOp. @@ -129,8 +130,10 @@ const ( // VariadicOpKindInvalid indicates an invalid unary operation. VariadicOpInvalid VariadicOp = iota - VariadicOpParseLogfmt // Parse logfmt line to set of columns operation (logfmt). - VariadicOpParseJSON // Parse JSON line to set of columns operation (json). + VariadicOpParseLogfmt // Parse logfmt line to set of columns operation (logfmt). + VariadicOpParseJSON // Parse JSON line to set of columns operation (json). + VariadicOpParseLinefmt // Parse linefmt line + VariadicOpParseLabelfmt // Parse labelfmt line to set of labels operation (labelfmt). ) // String returns the string representation of the UnaryOp. @@ -140,6 +143,10 @@ func (t VariadicOp) String() string { return "PARSE_LOGFMT" case VariadicOpParseJSON: return "PARSE_JSON" + case VariadicOpParseLinefmt: + return "PARSE_LINEFMT" + case VariadicOpParseLabelfmt: + return "PARSE_LABELFMT" default: panic(fmt.Sprintf("unknown variadic operator %d", t)) } diff --git a/pkg/logql/log/fmt.go b/pkg/logql/log/fmt.go index ad6fb13a8c979..61378c878e052 100644 --- a/pkg/logql/log/fmt.go +++ b/pkg/logql/log/fmt.go @@ -119,7 +119,7 @@ var ( } ) -func addLineAndTimestampFunctions(currLine func() string, currTimestamp func() int64) map[string]interface{} { +func AddLineAndTimestampFunctions(currLine func() string, currTimestamp func() int64) map[string]interface{} { functions := make(map[string]interface{}, len(functionMap)+2) for k, v := range functionMap { functions[k] = v @@ -203,7 +203,7 @@ func NewFormatter(tmpl string) (*LineFormatter, error) { buf: bytes.NewBuffer(make([]byte, 4096)), } - functions := addLineAndTimestampFunctions(func() string { + functions := AddLineAndTimestampFunctions(func() string { return unsafeGetString(lf.currentLine) }, func() int64 { return lf.currentTs @@ -367,7 +367,7 @@ func NewLabelsFormatter(fmts []LabelFmt) (*LabelsFormatter, error) { buf: bytes.NewBuffer(make([]byte, 1024)), } - functions := addLineAndTimestampFunctions(func() string { + functions := AddLineAndTimestampFunctions(func() string { return unsafeGetString(lf.currentLine) }, func() int64 { return lf.currentTs