Skip to content
35 changes: 26 additions & 9 deletions pkg/engine/internal/executor/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/apache/arrow-go/v18/arrow/memory"

"github.com/grafana/loki/v3/pkg/engine/internal/types"
"github.com/grafana/loki/v3/pkg/logql/log"
)

func NewScalar(value types.Literal, rows int) arrow.Array {
Expand Down Expand Up @@ -54,16 +55,32 @@ func NewScalar(value types.Literal, rows int) arrow.Array {
}
case *array.ListBuilder:
//TODO(twhitney): currently only supporting string list, but we can add more types here as we need them
value, ok := value.Any().([]string)
v, ok := value.Any().([]string)
if !ok {
panic(fmt.Errorf("unsupported list literal type: %T", value))
}

valueBuilder := builder.ValueBuilder().(*array.StringBuilder)
for range rows {
builder.Append(true)
for _, val := range value {
valueBuilder.Append(val)
v, ok := value.Any().([]log.LabelFmt)
if !ok {
panic(fmt.Errorf("unsupported list literal type: %T", v))
}
valueBuilder := builder.ValueBuilder().(*array.StructBuilder)
for range rows {
builder.Append(true)
for _, val := range v {
nameBuilder := valueBuilder.FieldBuilder(0).(*array.StringBuilder)
nameBuilder.Append(val.Name)
valBuilder := valueBuilder.FieldBuilder(1).(*array.StringBuilder)
valBuilder.Append(val.Value)
renameBuilder := valueBuilder.FieldBuilder(2).(*array.BooleanBuilder)
renameBuilder.Append(val.Rename)
valueBuilder.Append(true)
}
}
} else {
valueBuilder := builder.ValueBuilder().(*array.StringBuilder)
for range rows {
builder.Append(true)
for _, val := range v {
valueBuilder.Append(val)
}
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/engine/internal/executor/expressions.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,9 @@ func (e expressionEvaluator) eval(expr physical.Expression, input arrow.RecordBa

fn, err := variadicFunctions.GetForSignature(expr.Op)
if err != nil {
return nil, fmt.Errorf("failed to lookup unary function: %w", err)
return nil, fmt.Errorf("failed to lookup variadic function: %w", err)
}
return fn.Evaluate(args...)
return fn.Evaluate(input, args...)
}

return nil, fmt.Errorf("unknown expression: %v", expr)
Expand Down
10 changes: 6 additions & 4 deletions pkg/engine/internal/executor/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ func init() {
// Parse functions
variadicFunctions.register(types.VariadicOpParseLogfmt, parseFn(types.VariadicOpParseLogfmt))
variadicFunctions.register(types.VariadicOpParseJSON, parseFn(types.VariadicOpParseJSON))
variadicFunctions.register(types.VariadicOpParseLabelfmt, parseFn(types.VariadicOpParseLabelfmt))
variadicFunctions.register(types.VariadicOpParseLinefmt, parseFn(types.VariadicOpParseLinefmt))
}

type UnaryFunctionRegistry interface {
Expand Down Expand Up @@ -379,13 +381,13 @@ type VariadicFunctionRegistry interface {
}

type VariadicFunction interface {
Evaluate(args ...arrow.Array) (arrow.Array, error)
Evaluate(input arrow.RecordBatch, args ...arrow.Array) (arrow.Array, error)
}

type VariadicFunctionFunc func(args ...arrow.Array) (arrow.Array, error)
type VariadicFunctionFunc func(input arrow.RecordBatch, args ...arrow.Array) (arrow.Array, error)

func (f VariadicFunctionFunc) Evaluate(args ...arrow.Array) (arrow.Array, error) {
return f(args...)
func (f VariadicFunctionFunc) Evaluate(input arrow.RecordBatch, args ...arrow.Array) (arrow.Array, error) {
return f(input, args...)
}

type variadicFuncReg struct {
Expand Down
141 changes: 127 additions & 14 deletions pkg/engine/internal/executor/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package executor
import (
"fmt"
"sort"
"strconv"
"unsafe"

"github.com/apache/arrow-go/v18/arrow"
Expand All @@ -12,22 +13,48 @@ import (
"github.com/grafana/loki/v3/pkg/engine/internal/semconv"
"github.com/grafana/loki/v3/pkg/engine/internal/types"
"github.com/grafana/loki/v3/pkg/engine/internal/util"
"github.com/grafana/loki/v3/pkg/logql/log"
)

func parseFn(op types.VariadicOp) VariadicFunction {
return VariadicFunctionFunc(func(args ...arrow.Array) (arrow.Array, error) {
sourceCol, requestedKeys, strict, keepEmpty, err := extractParseFnParameters(args)
if err != nil {
panic(err)
}
return VariadicFunctionFunc(func(input arrow.RecordBatch, args ...arrow.Array) (arrow.Array, error) {
var (
sourceCol *array.String
requestedKeys []string
strict bool
keepEmpty bool
lineFmtTemplate string
labelFmts []log.LabelFmt
err error
)

var headers []string
var parsedColumns []arrow.Array
switch op {
case types.VariadicOpParseLogfmt:
headers, parsedColumns = buildLogfmtColumns(sourceCol, requestedKeys, strict, keepEmpty)
sourceCol, requestedKeys, strict, keepEmpty, err = extractParseFnParameters(args)
if err != nil {
panic(err)
}
headers, parsedColumns = buildLogfmtColumns(input, sourceCol, requestedKeys, strict, keepEmpty)
case types.VariadicOpParseJSON:
headers, parsedColumns = buildJSONColumns(sourceCol, requestedKeys)
sourceCol, requestedKeys, _, _, err = extractParseFnParameters(args)
if err != nil {
panic(err)
}
headers, parsedColumns = buildJSONColumns(input, sourceCol, requestedKeys)
case types.VariadicOpParseLinefmt:
sourceCol, _, lineFmtTemplate, err = extractLineFmtParameters(args)
if err != nil {
panic(err)
}
headers, parsedColumns = buildLinefmtColumns(input, sourceCol, lineFmtTemplate)
case types.VariadicOpParseLabelfmt:
sourceCol, _, labelFmts, err = extractLabelFmtParameters(args)
if err != nil {
panic(err)
}
headers, parsedColumns = buildLabelfmtColumns(input, sourceCol, labelFmts)
default:
return nil, fmt.Errorf("unsupported parser kind: %v", op)
}
Expand All @@ -36,9 +63,15 @@ func parseFn(op types.VariadicOp) VariadicFunction {
newFields := make([]arrow.Field, 0, len(headers))
for _, header := range headers {
ct := types.ColumnTypeParsed
if op == types.VariadicOpParseLabelfmt {
ct = types.ColumnTypeLabel
}
if header == semconv.ColumnIdentError.ShortName() || header == semconv.ColumnIdentErrorDetails.ShortName() {
ct = types.ColumnTypeGenerated
}
if header == semconv.ColumnIdentMessage.ShortName() {
ct = types.ColumnTypeBuiltin
}
ident := semconv.NewIdentifier(header, ct, types.Loki.String)
newFields = append(newFields, semconv.FieldFromIdent(ident, true))
}
Expand All @@ -51,6 +84,85 @@ func parseFn(op types.VariadicOp) VariadicFunction {
})
}

func extractLineFmtParameters(args []arrow.Array) (*array.String, []string, string, error) {
if len(args) != 3 {
return nil, nil, "", fmt.Errorf("parse function expected 3 arguments, got %d", len(args))
}
var sourceColArr, templateArr arrow.Array
sourceColArr = args[0]
// requestedKeysArr = args[1] not needed
templateArr = args[2]

if sourceColArr == nil {
return nil, nil, "", fmt.Errorf("parse function arguments did not include a source ColumnVector to parse")
}

sourceCol, ok := sourceColArr.(*array.String)
if !ok {
return nil, nil, "", fmt.Errorf("parse can only operate on string column types, got %T", sourceColArr)
}

stringArr, ok := templateArr.(*array.String)
if !ok {
return nil, nil, "", fmt.Errorf("template must be a string, got %T", templateArr)
}

var template string
templateIdx := 0
if (stringArr != nil) && (stringArr.Len() > 0) {
template = stringArr.Value(templateIdx)
}
return sourceCol, nil, template, nil

}

func extractLabelFmtParameters(args []arrow.Array) (*array.String, []string, []log.LabelFmt, error) {
if len(args) != 3 {
return nil, nil, nil, fmt.Errorf("parse function expected 3 arguments, got %d", len(args))
}
var sourceColArr, labelFmtArray arrow.Array
sourceColArr = args[0]
// requestedKeysArr = args[1] not needed
labelFmtArray = args[2]

if sourceColArr == nil {
return nil, nil, nil, fmt.Errorf("parse function arguments did not include a source ColumnVector to parse")
}

sourceCol, ok := sourceColArr.(*array.String)
if !ok {
return nil, nil, nil, fmt.Errorf("parse can only operate on string column types, got %T", sourceColArr)
}

listArr, ok := labelFmtArray.(*array.List)
if !ok {
return nil, nil, nil, fmt.Errorf("labelfmts array must be of type struct, got %T", labelFmtArray)
}
var labelFmts []log.LabelFmt
listValues := listArr.ListValues()
switch listValues := listValues.(type) {
case *array.Struct:
if sourceCol.Len() == 0 {
return sourceCol, nil, []log.LabelFmt{}, nil
}
// listValues will repeat one copy for each line of input; we want to only grab one copy
for i := 0; i < (listValues.Len() / sourceCol.Len()); i++ {
name := listValues.Field(0).ValueStr(i)
val := listValues.Field(1).ValueStr(i)
rename, err := strconv.ParseBool(listValues.Field(2).ValueStr(i))
if err != nil {
return nil, nil, nil, fmt.Errorf("wrong format for labelFmt value rename: error %v", err)
}
labelFmts = append(labelFmts, log.LabelFmt{Name: name, Value: val, Rename: rename})
}
default:
return nil, nil, nil, fmt.Errorf("unknown labelfmt type %T; expected *array.Struct", listValues)
}

return sourceCol, nil, labelFmts, nil

}

func extractParseFnParameters(args []arrow.Array) (*array.String, []string, bool, bool, error) {
// Valid signatures:
// parse(sourceColVec, requestedKeys, strict, keepEmpty)
Expand Down Expand Up @@ -113,13 +225,13 @@ func extractParseFnParameters(args []arrow.Array) (*array.String, []string, bool
}

// parseFunc represents a function that parses a single line and returns key-value pairs
type parseFunc func(line string) (map[string]string, error)
type parseFunc func(recordRow arrow.RecordBatch, line string) (map[string]string, error)

// buildColumns builds Arrow columns from input lines using the provided parser
// Returns the column headers, the Arrow columns, and any error
func buildColumns(input *array.String, _ []string, parseFunc parseFunc, errorType string) ([]string, []arrow.Array) {
func buildColumns(input arrow.RecordBatch, sourceCol *array.String, _ []string, parseFunc parseFunc, errorType string) ([]string, []arrow.Array) {
columnBuilders := make(map[string]*array.StringBuilder)
columnOrder := parseLines(input, columnBuilders, parseFunc, errorType)
columnOrder := parseLines(input, sourceCol, columnBuilders, parseFunc, errorType)

// Build final arrays
columns := make([]arrow.Array, 0, len(columnOrder))
Expand All @@ -135,14 +247,15 @@ func buildColumns(input *array.String, _ []string, parseFunc parseFunc, errorTyp
}

// parseLines discovers columns dynamically as lines are parsed
func parseLines(input *array.String, columnBuilders map[string]*array.StringBuilder, parseFunc parseFunc, errorType string) []string {
func parseLines(input arrow.RecordBatch, sourceCol *array.String, columnBuilders map[string]*array.StringBuilder, parseFunc parseFunc, errorType string) []string {
columnOrder := []string{}
var errorBuilder, errorDetailsBuilder *array.StringBuilder
hasErrorColumns := false

for i := 0; i < input.Len(); i++ {
line := input.Value(i)
parsed, err := parseFunc(line)
for i := 0; i < sourceCol.Len(); i++ {
line := sourceCol.Value(i)
// pass the corresponding row of input as well
parsed, err := parseFunc(input.NewSlice(int64(i), int64(i+1)), line)

// Handle error columns
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions pkg/engine/internal/executor/parse_json.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ var (
}
)

func buildJSONColumns(input *array.String, requestedKeys []string) ([]string, []arrow.Array) {
parseFunc := func(line string) (map[string]string, error) {
func buildJSONColumns(input arrow.RecordBatch, sourceCol *array.String, requestedKeys []string) ([]string, []arrow.Array) {
parseFunc := func(_ arrow.RecordBatch, line string) (map[string]string, error) {
return parseJSONLine(line, requestedKeys)
}
return buildColumns(input, requestedKeys, parseFunc, types.JSONParserErrorType)
return buildColumns(input, sourceCol, requestedKeys, parseFunc, types.JSONParserErrorType)
}

// parseJSONLine parses a single JSON line and extracts key-value pairs
Expand Down
72 changes: 72 additions & 0 deletions pkg/engine/internal/executor/parse_labelfmt.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package executor

import (
"fmt"
"time"

"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/prometheus/prometheus/model/labels"

"github.com/grafana/loki/v3/pkg/engine/internal/semconv"
"github.com/grafana/loki/v3/pkg/engine/internal/types"
"github.com/grafana/loki/v3/pkg/logql/log"
)

func buildLabelfmtColumns(input arrow.RecordBatch, sourceCol *array.String, labelFmts []log.LabelFmt) ([]string, []arrow.Array) {
parseFunc := func(row arrow.RecordBatch, line string) (map[string]string, error) {
return tokenizeLabelfmt(row, line, labelFmts)
}
return buildColumns(input, sourceCol, nil, parseFunc, types.LabelfmtParserErrorType)
}

// tokenizeLabelfmt parses labelfmt input using the standard decoder
// Returns a map of key-value pairs with first-wins semantics for duplicates
func tokenizeLabelfmt(input arrow.RecordBatch, line string, labelFmts []log.LabelFmt) (map[string]string, error) {
decoder, err := log.NewLabelsFormatter(labelFmts)
if err != nil {
return nil, fmt.Errorf("unable to create label formatter with formats %v", labelFmts)
}
lbls := buildLabelsFromInput(input)
var builder = log.NewBaseLabelsBuilder().ForLabels(lbls, labels.StableHash(lbls))
builder.Reset()
builder.Add(log.StructuredMetadataLabel, buildLabelsFromInput(input))

var timestampIdx = -1
for i := 0; i < len(input.Columns()); i++ {
if input.ColumnName(i) == types.ColumnFullNameTimestamp {
timestampIdx = i
break
}
}
if timestampIdx < 0 {
return map[string]string{}, fmt.Errorf("unable to find timestamp column in inputs")
}
ts, err := time.Parse("2006-01-02T15:04:05.999999999Z", input.Column(timestampIdx).ValueStr(0))
if err != nil {
panic(fmt.Sprintf("Unable to convert timestamp %v", input.Column(timestampIdx).ValueStr(0)))
}

decoder.Process(ts.UnixNano(), unsafeBytes(line), builder)
result := builder.LabelsResult().Labels().Map()
// result includes every single label from the input, not just the ones from labelFmts.
// Remove the labels we don't care about and return only the new/adjusted labels.
var relevantLabels = map[string]bool{}
for _, label := range labelFmts {
relevantLabels[label.Name] = true
}
for labelName := range result {
if _, ok := relevantLabels[labelName]; !ok {
delete(result, labelName)
}
}
return result, nil
}

func buildLabelsFromInput(input arrow.RecordBatch) labels.Labels {
var labelList []labels.Label
for i := 0; i < int(input.NumCols()); i++ {
labelList = append(labelList, labels.Label{Name: semconv.MustParseFQN(input.ColumnName(i)).ColumnRef().Column, Value: input.Column(i).ValueStr(0)})
}
return labels.New(labelList...)
}
Loading
Loading