diff --git a/CHANGELOG.md b/CHANGELOG.md index 6d2017f8..561a229f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - Add the `Slice` filter [#175](https://github.com/AdRoll/baker/pull/175) +- Add LUA filter [#110](https://github.com/AdRoll/baker/pull/110) ### Changed diff --git a/filter/all.go b/filter/all.go index 71a7eecc..70603a4a 100644 --- a/filter/all.go +++ b/filter/all.go @@ -17,6 +17,7 @@ var All = []baker.FilterDesc{ ExternalMatchDesc, FormatTimeDesc, HashDesc, + LUADesc, MetadataLastModifiedDesc, MetadataUrlDesc, NotNullDesc, diff --git a/filter/lua.go b/filter/lua.go new file mode 100644 index 00000000..ed5fdc98 --- /dev/null +++ b/filter/lua.go @@ -0,0 +1,336 @@ +package filter + +import ( + "fmt" + "log" + "runtime" + "sync" + "sync/atomic" + + "github.com/AdRoll/baker" + + lua "github.com/yuin/gopher-lua" +) + +var luaHelp = ` +This filter runs a baker filter defined in a LUA script. +It's useful to quickly write and run a Baker filter without having to recompile Baker. + +This filter is based on [GopherLua](github.com/yuin/gopher-lua), which is an LUA5.1 virtual machine. + +To use this filter you need to declare a function in an lua file. This function serves the same purpose +of the equivalent ` + ticks("baker.Filter.Process") + ` method one would write for a Go filter. + +This is a simple filter which writes "hey" to the field "f0" of every record: +` + startBlock("lua") + ` +-- rec is a record object to be processed +-- next is the function next(record) to forward, if you want to, the record into the filter chain. +function myFilter(rec, next) + rec:set(fieldNames["f0"], "hey") + next(rec) +end +` + endBlock() + ` + +### The ` + ticks("Record") + ` table + +The first argument received by your lua filter function is a Record table. The Record table is an surrogate +for its Go counterpart, ` + ticks("baker.Record") + `. The Lua Record table defines the following methods: + +#### get + +- Go: ` + ticks("Record.Get(FieldIndex) []byte") + ` +- lua: ` + ticks("record:get(idx) -> string") + ` + +Same as its Go counterpart ` + ticks("record::get") + ` takes a integer representing the field index and returns its value. +But, unlike in Go where the value is a ` + ticks("[]byte]") + `, in lua it's returned as a string, to allow for fast protoyping. + +#### "set" + +- Go: ` + ticks("Record.Set(FieldIndex, []byte)") + ` +- lua: ` + ticks("record:set(idx, string)") + ` + +Same as its Go counterpart ` + ticks("record::set") + ` takes a integer representing the field index and the value to set. +But, unlike in Go where the value is a ` + ticks("[]byte]") + `, in lua it's a string, to allow for fast protoyping. + +#### "copy" + +- Go: ` + ticks("Record.Copy() Record") + ` +- lua: ` + ticks("record:copy() -> record") + ` + +Calling ` + ticks("record::copy") + ` returns a new record, a deep-copy of the original. + +#### "clear" + +- Go: ` + ticks("Record.Clear()") + ` +- lua: ` + ticks("record:clear()") + ` + +Calling ` + ticks("record::clear") + ` clears the records internal state, making all its fields empty. + + +### Global functions + +#### createRecord + +- Go: ` + ticks("Components.CreateRecord() Record") + ` +- lua: ` + ticks("createRecord() -> record") + ` + +` + ticks("createRecord") + ` is the lua equivalent of the ` + ticks("CreateRecord") + ` function passed to your filter during construction. +It allows to create a new Record instance. + +#### validateRecord + +- Go: ` + ticks("Components.ValidateRecord(Record) (bool, FieldIndex)") + ` +- lua: ` + ticks("validateRecord(record) -> (bool, int)") + ` + +` + ticks("validateRecord") + ` is the lua equivalent of the ` + ticks("ValidateRecord") + ` function passed to your filter during construction. +It validates a given record with respect to the validation function, returning a boolean indicating whether +the record is a valid one, if false, the returned integer indicates the index of the first invalid field it met. + +#### fieldByName + +- Go: ` + ticks("Components.FieldByName(string) (FieldIndex, bool)") + ` +- lua: ` + ticks("fieldByName(string) -> int|nil") + ` + +` + ticks("fieldByName") + ` is the lua equivalent of the ` + ticks("FieldByName") + ` function passed to your filter during construction. +It allows to lookup a field index by its name, returning the index or nil if no field exists with this index. + +### Global tables + +#### fieldNames + +- Go: ` + ticks("Components.FieldNames []string") + ` +- lua: ` + ticks("fieldNames") + ` + +` + ticks("fieldNames") + ` is an integer-indexed table, in other words an array, containing all field names, as ` + ticks("FieldNames") + ` in Go. +` + +// TODO(arl): ideally this functions should not be required, but writing +// markdown documentation in Go strings is tedious and error-prone. + +func ticks(s string) string { return "`" + s + "`" } +func startBlock(lang string) string { return "```" + lang } +func endBlock() string { return "```" } + +// LUADesc describes the LUA filter +var LUADesc = baker.FilterDesc{ + Name: "LUA", + New: NewLUA, + Config: &LUAConfig{}, + Help: luaHelp, +} + +// LUAConfig holds the configuration for the LUA filter. +type LUAConfig struct { + Script string `help:"Path to the lua script where the baker filter is defined" required:"true"` + FilterName string `help:"Name of the lua function to run as baker filter" required:"true"` +} + +// LUA allows to run a baker filter from an external script written in lua. +type LUA struct { + nfiltered int64 // for filter stats + + mu sync.Mutex + l *lua.LState // lua state used during all the baker filter lifetime + + // These do not need protection against concurrent calls as they're strictly + // read-only. + luaProcess lua.LValue // lua filter function + recordMt lua.LValue // lua record type meta table + +} + +// NewLUA returns a new LUA filter. +func NewLUA(cfg baker.FilterParams) (baker.Filter, error) { + dcfg := cfg.DecodedConfig.(*LUAConfig) + + l := lua.NewState() + if err := l.DoFile(dcfg.Script); err != nil { + return nil, fmt.Errorf("can't compile lua script %q: %v", dcfg.Script, err) + } + + registerLUATypes(l, cfg.ComponentParams) + + luaFunc := l.GetGlobal(dcfg.FilterName) + if luaFunc.Type() == lua.LTNil { + return nil, fmt.Errorf("can't find lua filter function name %q in script %q", dcfg.FilterName, dcfg.Script) + } + + f := &LUA{ + l: l, + recordMt: l.GetTypeMetatable(luaRecordTypeName), + luaProcess: luaFunc, + } + + // Since a filter has no way to know when it's deallocated we set a + // finaliser on the lua state instance, which gives us the occasion to close + // it. + runtime.SetFinalizer(f, func(f *LUA) { f.l.Close() }) + + return f, nil +} + +// registerLUATypes registers, in the given lua state, some lua types +// and utility functions useful to run a baker filter: +// - the record type +// - createRecord function (creates and returns a new record) +// - validateRecord function (takes a record and returns a boolean and a +// number), see baker.ComponentParams.ValidateRecord +// - fieldByName function (returns a field index given its name, or nil +// if the given field name doesn't exist) +// - fieldNames, an lua table where field names are indexed by their field +// field indexes +func registerLUATypes(l *lua.LState, comp baker.ComponentParams) { + registerLUARecordType(l) + + l.SetGlobal("validateRecord", l.NewFunction(func(L *lua.LState) int { + luar := fastcheckLuaRecord(l, 1) + ok, fidx := comp.ValidateRecord(luar.r) + l.Push(lua.LBool(ok)) + l.Push(lua.LNumber(fidx)) + return 2 + })) + + l.SetGlobal("fieldByName", l.NewFunction(func(L *lua.LState) int { + fname := L.CheckString(1) + fidx, ok := comp.FieldByName(fname) + if !ok { + l.Push(lua.LNil) + } else { + l.Push(lua.LNumber(fidx)) + } + return 1 + })) + + // Create the fieldNaames table. + fields := l.NewTable() + for fidx, fname := range comp.FieldNames { + fields.RawSet(lua.LNumber(fidx), lua.LString(fname)) + } + l.SetGlobal("fieldNames", fields) +} + +func (t *LUA) Stats() baker.FilterStats { + return baker.FilterStats{ + NumFilteredLines: atomic.LoadInt64(&t.nfiltered), + } +} + +// TODO(arl) Look at https://github.com/yuin/gopher-lua#the-lstate-pool-pattern + +// Process forwards records to the lua-written filter. +func (t *LUA) Process(rec baker.Record, next func(baker.Record)) { + // Wrap the incoming record into an lua user data having the 'record' type meta-table. + t.mu.Lock() + ud := t.l.NewUserData() + t.l.SetMetatable(ud, t.recordMt) + ud.Value = &luaRecord{r: rec} + + err := t.l.CallByParam(lua.P{ + Fn: t.luaProcess, + NRet: 2, + Protect: true, + }, ud) + t.mu.Unlock() + + if err != nil { + // this should cause panic at runtime + panic(err) + } + + keep := t.l.Get(-2).(lua.LBool) + luaErr := t.l.Get(-1) + t.l.Pop(2) + if luaErr != lua.LNil { + log.Printf("error from lua function: %v", luaErr.(lua.LString)) + return + } + + if !keep { + atomic.AddInt64(&t.nfiltered, 1) + return + } + + next(rec) +} + +// lua record methods + +const luaRecordTypeName = "record" + +// registers the 'record' type into the given lua state. +func registerLUARecordType(L *lua.LState) { + mt := L.NewTypeMetatable(luaRecordTypeName) + L.SetGlobal(luaRecordTypeName, mt) + L.SetField(mt, "__index", L.SetFuncs(L.NewTable(), luaRecordMethods)) +} + +// converts a baker.Record to lua user data, suitable to be pushed onto +// an lua stack. +func recordToLua(L *lua.LState, r baker.Record) *lua.LUserData { + ud := L.NewUserData() + ud.Value = &luaRecord{r: r} + // TODO(arl) record type metatable likely never changes so we could avoid + // the extra lookup here + L.SetMetatable(ud, L.GetTypeMetatable(luaRecordTypeName)) + return ud +} + +// checks that the element at current stack index n is an lua +// user data, holding an luaRecord, and returns it. Raises an lua +// runtime error if the element is not a record. +func checkLuaRecord(L *lua.LState, n int) *luaRecord { + ud := L.CheckUserData(n) + if v, ok := ud.Value.(*luaRecord); ok { + return v + } + L.ArgError(n, fmt.Sprintf("record expected, got %#v", ud.Value)) + return nil +} + +// faster version of checkLuaRecord that panics if the stack element +// is not a record, rather than raising an lua runtime error. +func fastcheckLuaRecord(L *lua.LState, n int) *luaRecord { + return L.Get(n).(*lua.LUserData).Value.(*luaRecord) +} + +// holds the lua-bound methods of baker.Record. +var luaRecordMethods = map[string]lua.LGFunction{ + "get": luaRecordGet, + "set": luaRecordSet, + "clear": luaRecordClear, +} + +// lua wrapper over a baker Record. +type luaRecord struct { + r baker.Record +} + +// record:get(int) returns string +func luaRecordGet(L *lua.LState) int { + luar := fastcheckLuaRecord(L, 1) + fidx := L.CheckInt(2) + + buf := luar.r.Get(baker.FieldIndex(fidx)) + + L.Push(lua.LString(string(buf))) + return 1 +} + +// record:set(int, string) +func luaRecordSet(L *lua.LState) int { + luar := fastcheckLuaRecord(L, 1) + fidx := L.CheckInt(2) + val := L.CheckString(3) + + luar.r.Set(baker.FieldIndex(fidx), []byte(val)) + + return 0 +} + +// record:clear() +func luaRecordClear(L *lua.LState) int { + luar := fastcheckLuaRecord(L, 1) + luar.r.Clear() + + return 0 +} diff --git a/filter/lua_test.go b/filter/lua_test.go new file mode 100644 index 00000000..8cbbde0c --- /dev/null +++ b/filter/lua_test.go @@ -0,0 +1,177 @@ +package filter + +import ( + "bytes" + "io/ioutil" + "os" + "path/filepath" + "testing" + + "github.com/AdRoll/baker" +) + +func BenchmarkLUAProcess(b *testing.B) { + b.ReportAllocs() + const script = ` +-- rec is a record object +-- next is function next(record) +function dummy(rec, next) + rec:set(0, "hey") + next(rec) +end +` + + dir, err := ioutil.TempDir("", b.Name()) + if err != nil { + b.Fatal(err) + } + + fname := filepath.Join(dir, "filters.lua") + if err := ioutil.WriteFile(fname, []byte(script), os.ModePerm); err != nil { + b.Fatalf("can't write lua script: %v", err) + } + + record := &baker.LogLine{} + + fieldByName := func(name string) (baker.FieldIndex, bool) { + switch name { + case "foo": + return 0, true + case "bar": + return 1, true + case "baz": + return 2, true + } + return 0, false + } + + f, err := NewLUA(baker.FilterParams{ + ComponentParams: baker.ComponentParams{ + FieldByName: fieldByName, + DecodedConfig: &LUAConfig{ + Script: fname, + FilterName: "dummy", + }, + }, + }) + + if err != nil { + b.Fatalf("NewLUA error = %v", err) + } + + b.ResetTimer() + for n := 0; n < b.N; n++ { + f.Process(record, func(baker.Record) {}) + } +} + +func TestLUAFilter(t *testing.T) { + // This is the lua script containing the lua functions used in the test cases. + fname := filepath.Join("testdata", "lua_test.lua") + + fieldNames := []string{"foo", "bar", "baz"} + fieldByName := func(name string) (baker.FieldIndex, bool) { + for i, n := range fieldNames { + if n == name { + return baker.FieldIndex(i), true + } + } + + return 0, false + } + + tests := []struct { + name string // both test case name and lua filter name + record string + wantErr bool // configuration-time error + want [][3]string // contains non discarded records with, for each of them, the 3 fields we want + }{ + { + name: "swapFields", + record: "abc,def,ghi", + want: [][3]string{ + {"abc", "ghi", "def"}, + }, + }, + { + name: "_fieldByName", + record: "abc,def,ghi", + want: [][3]string{ + {"abc", "ghi", "def"}, + }, + }, + { + name: "_fieldNames", + record: "abc,def,ghi", + want: [][3]string{ + {"foo", "bar", "baz"}, + }, + }, + { + name: "clearRecord", + record: "foo,bar,baz", + want: [][3]string{ + {"", "", ""}, + }, + }, + + // error cases + { + name: "doesNotExist", + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + f, err := NewLUA(baker.FilterParams{ + ComponentParams: baker.ComponentParams{ + FieldByName: fieldByName, + FieldNames: fieldNames, + CreateRecord: func() baker.Record { + return &baker.LogLine{FieldSeparator: ','} + }, + ValidateRecord: func(r baker.Record) (bool, baker.FieldIndex) { + if string(r.Get(0)) != "hello" { + return false, 0 + } + return true, -1 + }, + DecodedConfig: &LUAConfig{ + Script: fname, + FilterName: tt.name, + }, + }, + }) + + if (err != nil) != (tt.wantErr) { + t.Fatalf("got error = %v, want error = %v", err, tt.wantErr) + } + + if tt.wantErr { + return + } + + l := &baker.LogLine{FieldSeparator: ','} + if err := l.Parse([]byte(tt.record), nil); err != nil { + t.Fatalf("parse error: %q", err) + } + + var got []baker.Record + f.Process(l, func(r baker.Record) { got = append(got, r) }) + + // Check the number of non discarded records match + if len(got) != len(tt.want) { + t.Fatalf("got %d non-discarded records, want %d", len(got), len(tt.want)) + } + + for recidx, rec := range tt.want { + for fidx, fval := range rec { + f := got[recidx].Get(baker.FieldIndex(fidx)) + if !bytes.Equal(f, []byte(fval)) { + t.Errorf("got record[%d].Get(%d) = %q, want %q", recidx, fidx, string(f), fval) + } + } + } + }) + } +} diff --git a/filter/testdata/lua_test.lua b/filter/testdata/lua_test.lua new file mode 100644 index 00000000..6e42bd29 --- /dev/null +++ b/filter/testdata/lua_test.lua @@ -0,0 +1,32 @@ +function swapFields(rec) + local f1, f2 = rec:get(1), rec:get(2) + rec:set(1, f2) + rec:set(2, f1) + return true, nil +end + +function errorFromLua(rec) + return false, "error from lua" +end + + +function _fieldByName(rec) + local f1, f2 + f1 = rec:get(fieldByName("bar")) + rec:set(1, rec:get(fieldByName("baz"))) + rec:set(2, f1) + return true, nil +end + +function _fieldNames(rec) + -- set each field to its name + rec:set(0, fieldNames[0]) + rec:set(1, fieldNames[1]) + rec:set(2, fieldNames[2]) + return true, nil +end + +function clearRecord(rec) + rec:clear() + return true, nil +end \ No newline at end of file diff --git a/go.mod b/go.mod index deeb47ad..b0d7ac42 100644 --- a/go.mod +++ b/go.mod @@ -22,6 +22,7 @@ require ( github.com/valyala/gozstd v1.9.0 github.com/vmware/vmware-go-kcl v0.0.0-20210126043010-022ec8d9de8f github.com/yuin/goldmark v1.2.1 // indirect + github.com/yuin/gopher-lua v0.0.0-20210529063254-f4c35e4016d9 golang.org/x/net v0.0.0-20201021035429-f5854403a974 golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4 // indirect ) diff --git a/go.sum b/go.sum index 1603eef8..4941f896 100644 --- a/go.sum +++ b/go.sum @@ -29,6 +29,9 @@ github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b/go.mod h1:ac9efd0D github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/charmbracelet/glamour v0.2.0 h1:mTgaiNiumpqTZp3qVM6DH9UB0NlbY17wejoMf1kM8Pg= github.com/charmbracelet/glamour v0.2.0/go.mod h1:UA27Kwj3QHialP74iU6C+Gpc8Y7IOAKupeKMLLBURWM= +github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= +github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= +github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= github.com/danwakefield/fnmatch v0.0.0-20160403171240-cbb64ac3d964 h1:y5HC9v93H5EPKqaS1UYVg1uYah5Xf51mBfIoWehClUQ= github.com/danwakefield/fnmatch v0.0.0-20160403171240-cbb64ac3d964/go.mod h1:Xd9hchkHSWYkEqJwUGisez3G1QY8Ryz0sdWrLPMGjLk= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -148,6 +151,8 @@ github.com/vmware/vmware-go-kcl v0.0.0-20210126043010-022ec8d9de8f/go.mod h1:a5S github.com/yuin/goldmark v1.2.0/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1 h1:ruQGxdhGHe7FWOJPT0mKs5+pD2Xs1Bm/kdGlHO04FmM= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/gopher-lua v0.0.0-20210529063254-f4c35e4016d9 h1:k/gmLsJDWwWqbLCur2yWnJzwQEKRcAHXo6seXGuSwWw= +github.com/yuin/gopher-lua v0.0.0-20210529063254-f4c35e4016d9/go.mod h1:E1AXubJBdNmFERAOucpDIxNzeGfLzg0mYh+UfMWdChA= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/multierr v1.2.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/zap v1.11.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= @@ -168,6 +173,7 @@ golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190204203706-41f3e6584952/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=