From f2d1a09d269ee157754ce7f6378f93854b5713c3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aur=C3=A9lien=20Rainone?= Date: Thu, 3 Dec 2020 20:32:19 +0100 Subject: [PATCH 01/21] filter: lua: initial commit --- filter/all.go | 1 + filter/lua.go | 122 ++++++++++++++++++++++++++++++++++++++++++++++++++ go.mod | 1 + go.sum | 6 +++ 4 files changed, 130 insertions(+) create mode 100644 filter/lua.go diff --git a/filter/all.go b/filter/all.go index 71a7eecc..70603a4a 100644 --- a/filter/all.go +++ b/filter/all.go @@ -17,6 +17,7 @@ var All = []baker.FilterDesc{ ExternalMatchDesc, FormatTimeDesc, HashDesc, + LUADesc, MetadataLastModifiedDesc, MetadataUrlDesc, NotNullDesc, diff --git a/filter/lua.go b/filter/lua.go new file mode 100644 index 00000000..36b9e68c --- /dev/null +++ b/filter/lua.go @@ -0,0 +1,122 @@ +package filter + +import ( + "fmt" + "runtime" + + "github.com/AdRoll/baker" + lua "github.com/yuin/gopher-lua" +) + +// LUADesc describes the LUA filter +var LUADesc = baker.FilterDesc{ + Name: "LUA", + New: NewLUA, + Config: &LUAConfig{}, + Help: `TBD`, +} + +type LUAConfig struct { + Script string + FilterName string +} + +type LUA struct { + l *lua.LState + funcName string +} + +func NewLUA(cfg baker.FilterParams) (baker.Filter, error) { + dcfg := cfg.DecodedConfig.(*LUAConfig) + + l := lua.NewState() + if err := l.DoFile(dcfg.Script); err != nil { + return nil, fmt.Errorf("can't compile lua script %q: %v", dcfg.Script, err) + } + registerLUARecordType(l) + + f := &LUA{ + funcName: dcfg.FilterName, + l: l, + } + + runtime.SetFinalizer(f, func(f *LUA) { f.l.Close() }) + + return f, nil +} + +func (t *LUA) Stats() baker.FilterStats { return baker.FilterStats{} } + +func (t *LUA) Process(rec baker.Record, next func(baker.Record)) { + luaNext := t.l.NewFunction(func(L *lua.LState) int { + recordArg := checkLuaRecord(L, 1) + next(recordArg.r) + return 0 + }) + + err := t.l.CallByParam(lua.P{ + Fn: t.l.GetGlobal(t.funcName), + NRet: 0, + Protect: true, + }, recordToLua(t.l, rec), + luaNext) + + if err != nil { + panic(err) + } +} + +const luaRecordTypeName = "record" + +func registerLUARecordType(L *lua.LState) { + mt := L.NewTypeMetatable(luaRecordTypeName) + L.SetGlobal(luaRecordTypeName, mt) + L.SetField(mt, "__index", L.SetFuncs(L.NewTable(), luaRecordMethods)) +} + +func recordToLua(L *lua.LState, r baker.Record) *lua.LUserData { + ud := L.NewUserData() + ud.Value = &luaRecord{r: r} + L.SetMetatable(ud, L.GetTypeMetatable(luaRecordTypeName)) + return ud +} + +var luaRecordMethods = map[string]lua.LGFunction{ + "get": luaRecordGet, + "set": luaRecordSet, +} + +type luaRecord struct { + r baker.Record +} + +func checkLuaRecord(L *lua.LState, n int) *luaRecord { + ud := L.CheckUserData(n) + if v, ok := ud.Value.(*luaRecord); ok { + return v + } + L.ArgError(n, fmt.Sprintf("record expected, got %#v", ud.Value)) + return nil +} + +// record:get(int) returns string +func luaRecordGet(L *lua.LState) int { + luar := checkLuaRecord(L, 1) + fidx := L.CheckInt(2) + + buf := luar.r.Get(baker.FieldIndex(fidx)) + + L.Push(lua.LString(string(buf))) + return 1 +} + +// record:set(int, string) +func luaRecordSet(L *lua.LState) int { + luar := checkLuaRecord(L, 1) + fidx := L.CheckInt(2) + val := L.CheckString(3) + + luar.r.Set(baker.FieldIndex(fidx), []byte(val)) + + return 0 +} diff --git a/go.mod b/go.mod index deeb47ad..fe61d1be 100644 --- a/go.mod +++ b/go.mod @@ -22,6 +22,7 @@ require ( github.com/valyala/gozstd v1.9.0 github.com/vmware/vmware-go-kcl v0.0.0-20210126043010-022ec8d9de8f github.com/yuin/goldmark v1.2.1 // indirect + github.com/yuin/gopher-lua v0.0.0-20200816102855-ee81675732da golang.org/x/net v0.0.0-20201021035429-f5854403a974 golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4 // indirect ) diff --git a/go.sum b/go.sum index 1603eef8..e37c7599 100644 --- a/go.sum +++ b/go.sum @@ -29,6 +29,9 @@ github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b/go.mod h1:ac9efd0D github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/charmbracelet/glamour v0.2.0 h1:mTgaiNiumpqTZp3qVM6DH9UB0NlbY17wejoMf1kM8Pg= github.com/charmbracelet/glamour v0.2.0/go.mod h1:UA27Kwj3QHialP74iU6C+Gpc8Y7IOAKupeKMLLBURWM= +github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= +github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= +github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= github.com/danwakefield/fnmatch v0.0.0-20160403171240-cbb64ac3d964 h1:y5HC9v93H5EPKqaS1UYVg1uYah5Xf51mBfIoWehClUQ= github.com/danwakefield/fnmatch v0.0.0-20160403171240-cbb64ac3d964/go.mod h1:Xd9hchkHSWYkEqJwUGisez3G1QY8Ryz0sdWrLPMGjLk= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -148,6 +151,8 @@ github.com/vmware/vmware-go-kcl v0.0.0-20210126043010-022ec8d9de8f/go.mod h1:a5S github.com/yuin/goldmark v1.2.0/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1 h1:ruQGxdhGHe7FWOJPT0mKs5+pD2Xs1Bm/kdGlHO04FmM= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/gopher-lua v0.0.0-20200816102855-ee81675732da h1:NimzV1aGyq29m5ukMK0AMWEhFaL/lrEOaephfuoiARg= +github.com/yuin/gopher-lua v0.0.0-20200816102855-ee81675732da/go.mod h1:E1AXubJBdNmFERAOucpDIxNzeGfLzg0mYh+UfMWdChA= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/multierr v1.2.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/zap v1.11.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= @@ -168,6 +173,7 @@ golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190204203706-41f3e6584952/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= From 06484d85d323346d001348864fdf902e9a977819 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aur=C3=A9lien=20Rainone?= Date: Mon, 7 Dec 2020 12:29:25 +0100 Subject: [PATCH 02/21] filter: lua: fastcheckLuaRecord --- filter/lua.go | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/filter/lua.go b/filter/lua.go index 36b9e68c..53faea33 100644 --- a/filter/lua.go +++ b/filter/lua.go @@ -22,8 +22,8 @@ type LUAConfig struct { } type LUA struct { - l *lua.LState - funcName string + l *lua.LState + luaFunc lua.LValue } func NewLUA(cfg baker.FilterParams) (baker.Filter, error) { @@ -34,10 +34,12 @@ func NewLUA(cfg baker.FilterParams) (baker.Filter, error) { return nil, fmt.Errorf("can't compile lua script %q: %v", dcfg.Script, err) } registerLUARecordType(l) + // TODO: check function exists + luaFunc := l.GetGlobal(dcfg.FilterName) f := &LUA{ - funcName: dcfg.FilterName, - l: l, + luaFunc: luaFunc, + l: l, } runtime.SetFinalizer(f, func(f *LUA) { f.l.Close() }) @@ -49,13 +51,13 @@ func (t *LUA) Stats() baker.FilterStats { return baker.FilterStats{} } func (t *LUA) Process(rec baker.Record, next func(baker.Record)) { luaNext := t.l.NewFunction(func(L *lua.LState) int { - recordArg := checkLuaRecord(L, 1) + recordArg := fastcheckLuaRecord(L, 1) next(recordArg.r) return 0 }) err := t.l.CallByParam(lua.P{ - Fn: t.l.GetGlobal(t.funcName), + Fn: t.luaFunc, NRet: 0, Protect: true, }, recordToLua(t.l, rec), @@ -99,9 +101,13 @@ func checkLuaRecord(L *lua.LState, n int) *luaRecord { return nil } +func fastcheckLuaRecord(L *lua.LState, n int) *luaRecord { + return L.Get(n).(*lua.LUserData).Value.(*luaRecord) +} + // record:get(int) returns string func luaRecordGet(L *lua.LState) int { - luar := checkLuaRecord(L, 1) + luar := fastcheckLuaRecord(L, 1) fidx := L.CheckInt(2) buf := luar.r.Get(baker.FieldIndex(fidx)) @@ -112,7 +118,7 @@ func luaRecordGet(L *lua.LState) int { // record:set(int, string) func luaRecordSet(L *lua.LState) int { - luar := checkLuaRecord(L, 1) + luar := fastcheckLuaRecord(L, 1) fidx := L.CheckInt(2) val := L.CheckString(3) From 09c417546da01df4c6db04e907fe8969eb7d5fbf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aur=C3=A9lien=20Rainone?= Date: Mon, 7 Dec 2020 12:36:02 +0100 Subject: [PATCH 03/21] filter: lua: reduce allocation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Suppress one allocation, reducing their number from 5 to 4, by pre-allocating the userdata we use to wrap the record passed to the lua filter. name old time/op new time/op delta LUAProcess-8 638ns ± 4% 559ns ± 6% -12.31% (p=0.000 n=10+10) name old alloc/op new alloc/op delta LUAProcess-8 152B ± 0% 104B ± 0% -31.58% (p=0.000 n=10+10) name old allocs/op new allocs/op delta LUAProcess-8 5.00 ± 0% 4.00 ± 0% -20.00% (p=0.000 n=10+10) --- filter/lua.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/filter/lua.go b/filter/lua.go index 53faea33..67fc66db 100644 --- a/filter/lua.go +++ b/filter/lua.go @@ -23,6 +23,7 @@ type LUAConfig struct { type LUA struct { l *lua.LState + ud *lua.LUserData luaFunc lua.LValue } @@ -37,9 +38,14 @@ func NewLUA(cfg baker.FilterParams) (baker.Filter, error) { // TODO: check function exists luaFunc := l.GetGlobal(dcfg.FilterName) + // Preallocate the userdata we use to wrap the record passed to the filter. + ud := l.NewUserData() + l.SetMetatable(ud, l.GetTypeMetatable(luaRecordTypeName)) + f := &LUA{ luaFunc: luaFunc, l: l, + ud: ud, } runtime.SetFinalizer(f, func(f *LUA) { f.l.Close() }) @@ -56,11 +62,14 @@ func (t *LUA) Process(rec baker.Record, next func(baker.Record)) { return 0 }) + // Modify the record inside the pre-allocated user value + t.ud.Value = &luaRecord{r: rec} + err := t.l.CallByParam(lua.P{ Fn: t.luaFunc, NRet: 0, Protect: true, - }, recordToLua(t.l, rec), + }, t.ud, luaNext) if err != nil { From 34c5e4816e716d6dcf59bec5c2401b6523b936ad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aur=C3=A9lien=20Rainone?= Date: Mon, 7 Dec 2020 12:46:56 +0100 Subject: [PATCH 04/21] filter: lua: preallocate the next function MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Reduce allocation from 4 to 2 by preallocating the lua function which is a wrapper to the Go 'next' function we receive. name old time/op new time/op delta LUAProcess-8 559ns ± 6% 459ns ± 7% -17.97% (p=0.000 n=10+10) name old alloc/op new alloc/op delta LUAProcess-8 104B ± 0% 24B ± 0% -76.92% (p=0.000 n=10+10) name old allocs/op new allocs/op delta LUAProcess-8 4.00 ± 0% 2.00 ± 0% -50.00% (p=0.000 n=10+10) --- filter/lua.go | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/filter/lua.go b/filter/lua.go index 67fc66db..eb99d7ca 100644 --- a/filter/lua.go +++ b/filter/lua.go @@ -25,6 +25,8 @@ type LUA struct { l *lua.LState ud *lua.LUserData luaFunc lua.LValue + luaNext *lua.LFunction + next func(baker.Record) } func NewLUA(cfg baker.FilterParams) (baker.Filter, error) { @@ -48,6 +50,12 @@ func NewLUA(cfg baker.FilterParams) (baker.Filter, error) { ud: ud, } + // Preallocate the lua next function passed to the filter + f.luaNext = l.NewFunction(func(L *lua.LState) int { + f.next(fastcheckLuaRecord(L, 1).r) + return 0 + }) + runtime.SetFinalizer(f, func(f *LUA) { f.l.Close() }) return f, nil @@ -56,21 +64,18 @@ func NewLUA(cfg baker.FilterParams) (baker.Filter, error) { func (t *LUA) Stats() baker.FilterStats { return baker.FilterStats{} } func (t *LUA) Process(rec baker.Record, next func(baker.Record)) { - luaNext := t.l.NewFunction(func(L *lua.LState) int { - recordArg := fastcheckLuaRecord(L, 1) - next(recordArg.r) - return 0 - }) - // Modify the record inside the pre-allocated user value t.ud.Value = &luaRecord{r: rec} + // Set the next function which is called by the lua filter to the one + // we just received. + t.next = next + err := t.l.CallByParam(lua.P{ Fn: t.luaFunc, NRet: 0, Protect: true, - }, t.ud, - luaNext) + }, t.ud, t.luaNext) if err != nil { panic(err) From edbff9ccc91f44c90bf62257aaedd5c207dd9255 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aur=C3=A9lien=20Rainone?= Date: Sat, 12 Dec 2020 12:46:23 +0100 Subject: [PATCH 05/21] filter: lua: bind fields, createRecord and validateRecord --- filter/lua.go | 31 ++++++- filter/lua_test.go | 171 +++++++++++++++++++++++++++++++++++ filter/testdata/lua_test.lua | 34 +++++++ 3 files changed, 234 insertions(+), 2 deletions(-) create mode 100644 filter/lua_test.go create mode 100644 filter/testdata/lua_test.lua diff --git a/filter/lua.go b/filter/lua.go index eb99d7ca..1316341f 100644 --- a/filter/lua.go +++ b/filter/lua.go @@ -31,12 +31,13 @@ type LUA struct { func NewLUA(cfg baker.FilterParams) (baker.Filter, error) { dcfg := cfg.DecodedConfig.(*LUAConfig) - l := lua.NewState() if err := l.DoFile(dcfg.Script); err != nil { return nil, fmt.Errorf("can't compile lua script %q: %v", dcfg.Script, err) } - registerLUARecordType(l) + + registerLUATypes(l, cfg.ComponentParams) + // TODO: check function exists luaFunc := l.GetGlobal(dcfg.FilterName) @@ -61,6 +62,32 @@ func NewLUA(cfg baker.FilterParams) (baker.Filter, error) { return f, nil } +func registerLUATypes(l *lua.LState, comp baker.ComponentParams) { + registerLUARecordType(l) + + l.SetGlobal("createRecord", l.NewFunction(func(L *lua.LState) int { + rec := comp.CreateRecord() + ud := recordToLua(l, rec) + L.Push(ud) + return 1 + })) + + l.SetGlobal("validateRecord", l.NewFunction(func(L *lua.LState) int { + luar := fastcheckLuaRecord(l, 1) + ok, fidx := comp.ValidateRecord(luar.r) + l.Push(lua.LBool(ok)) + l.Push(lua.LNumber(fidx)) + return 2 + })) + + // Create the fields table. + fields := l.NewTable() + for i, n := range comp.FieldNames { + fields.RawSetString(n, lua.LNumber(i)) + } + l.SetGlobal("fieldNames", fields) +} + func (t *LUA) Stats() baker.FilterStats { return baker.FilterStats{} } func (t *LUA) Process(rec baker.Record, next func(baker.Record)) { diff --git a/filter/lua_test.go b/filter/lua_test.go new file mode 100644 index 00000000..e4e58765 --- /dev/null +++ b/filter/lua_test.go @@ -0,0 +1,171 @@ +package filter + +import ( + "bytes" + "io/ioutil" + "os" + "path/filepath" + "testing" + + "github.com/AdRoll/baker" +) + +func BenchmarkLUAProcess(b *testing.B) { + b.ReportAllocs() + const script = ` +-- rec is a record object +-- next is function next(record) +function dummy(rec, next) + rec:set(0, "hey") + next(rec) +end +` + + dir, err := ioutil.TempDir("", b.Name()) + if err != nil { + b.Fatal(err) + } + // fname := filepath.Join(b.TempDir(), "filters.lua") + fname := filepath.Join(dir, "filters.lua") + if err := ioutil.WriteFile(fname, []byte(script), os.ModePerm); err != nil { + b.Fatalf("can't write lua script: %v", err) + } + + record := &baker.LogLine{} + + fieldByName := func(name string) (baker.FieldIndex, bool) { + switch name { + case "foo": + return 0, true + case "bar": + return 1, true + case "baz": + return 2, true + } + return 0, false + } + + f, err := NewLUA(baker.FilterParams{ + ComponentParams: baker.ComponentParams{ + FieldByName: fieldByName, + DecodedConfig: &LUAConfig{ + Script: fname, + FilterName: "dummy", + }, + }, + }) + + if err != nil { + b.Fatalf("NewLUA error = %v", err) + } + + b.ResetTimer() + for n := 0; n < b.N; n++ { + f.Process(record, func(baker.Record) {}) + } +} + +func TestLUAFilter(t *testing.T) { + // This is the lua script containing the lua functions used in the test cases. + fname := filepath.Join("testdata", "lua_test.lua") + + fieldNames := []string{"foo", "bar", "baz"} + fieldByName := func(name string) (baker.FieldIndex, bool) { + for i, n := range fieldNames { + if n == name { + return baker.FieldIndex(i), true + } + } + + return 0, false + } + + tests := []struct { + name string // both test case name and lua filter name + record string + wantErr bool // configuration-time error + want [][3]string // contains non discarded records with, for each of them, the 3 fields we want + }{ + { + name: "swapFieldsWithIndex", + record: "abc,def,ghi", + want: [][3]string{ + {"abc", "ghi", "def"}, + }, + }, + { + name: "swapFieldsWithNames", + record: "abc,def,ghi", + want: [][3]string{ + {"abc", "ghi", "def"}, + }, + }, + { + name: "_createRecord", + record: "abc,def,ghi", + want: [][3]string{ + {"hey", "ho", "let's go!"}, + {"abc", "def", "ghi"}, + }, + }, + { + name: "_validateRecord", + record: "ciao,,", + want: [][3]string{ + {"good", "", ""}, + }}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + f, err := NewLUA(baker.FilterParams{ + ComponentParams: baker.ComponentParams{ + FieldByName: fieldByName, + FieldNames: fieldNames, + CreateRecord: func() baker.Record { + return &baker.LogLine{FieldSeparator: ','} + }, + ValidateRecord: func(r baker.Record) (bool, baker.FieldIndex) { + if string(r.Get(0)) != "hello" { + return false, 0 + } + return true, -1 + }, + DecodedConfig: &LUAConfig{ + Script: fname, + FilterName: tt.name, + }, + }, + }) + + if (err != nil) != (tt.wantErr) { + t.Fatalf("got error = %v, want error = %v", err, tt.wantErr) + } + + if tt.wantErr { + return + } + + l := &baker.LogLine{FieldSeparator: ','} + if err := l.Parse([]byte(tt.record), nil); err != nil { + t.Fatalf("parse error: %q", err) + } + + var got []baker.Record + f.Process(l, func(r baker.Record) { got = append(got, r) }) + + // Check the number of non discarded records match + if len(got) != len(tt.want) { + t.Fatalf("got %d non-discarded records, want %d", len(got), len(tt.want)) + } + + for recidx, rec := range tt.want { + for fidx, fval := range rec { + f := got[recidx].Get(baker.FieldIndex(fidx)) + if !bytes.Equal(f, []byte(fval)) { + t.Errorf("got record[%d].Get(%d) = %q, want %q", recidx, fidx, string(f), fval) + } + } + } + }) + } +} diff --git a/filter/testdata/lua_test.lua b/filter/testdata/lua_test.lua new file mode 100644 index 00000000..30fa0c46 --- /dev/null +++ b/filter/testdata/lua_test.lua @@ -0,0 +1,34 @@ +function swapFieldsWithIndex(rec, next) + local f1, f2 + f1 = rec:get(1) + rec:set(1, rec:get(2)) + rec:set(2, f1) + next(rec) +end + +function swapFieldsWithNames(rec, next) + local f1, f2 + f1 = rec:get(fieldNames["bar"]) + rec:set(1, rec:get(fieldNames["baz"])) + rec:set(2, f1) + next(rec) +end + +function _createRecord(rec, next) + newrec = createRecord() + newrec:set(0, "hey") + newrec:set(1, "ho") + newrec:set(2, "let's go!") + next(newrec) + next(rec) +end + +function _validateRecord(rec, next) + ok, idx = validateRecord(rec) + if ok == false and idx == 0 then + rec:set(0, "good") + else + rec:set(0, "bad") + end + next(rec) +end From 065649deff7902bcd38bcfc6f96c764dd3017866 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aur=C3=A9lien=20Rainone?= Date: Sat, 12 Dec 2020 20:42:35 +0100 Subject: [PATCH 06/21] filter: lua: implement lua record copy+clear --- filter/lua.go | 41 ++++++++++++++++++++++++++++-------- filter/lua_test.go | 18 +++++++++++++++- filter/testdata/lua_test.lua | 14 ++++++++++++ 3 files changed, 63 insertions(+), 10 deletions(-) diff --git a/filter/lua.go b/filter/lua.go index 1316341f..ec5c6aea 100644 --- a/filter/lua.go +++ b/filter/lua.go @@ -109,6 +109,8 @@ func (t *LUA) Process(rec baker.Record, next func(baker.Record)) { } } +// lua record methods + const luaRecordTypeName = "record" func registerLUARecordType(L *lua.LState) { @@ -124,15 +126,6 @@ func recordToLua(L *lua.LState, r baker.Record) *lua.LUserData { return ud } -var luaRecordMethods = map[string]lua.LGFunction{ - "get": luaRecordGet, - "set": luaRecordSet, -} - -type luaRecord struct { - r baker.Record -} - func checkLuaRecord(L *lua.LState, n int) *luaRecord { ud := L.CheckUserData(n) if v, ok := ud.Value.(*luaRecord); ok { @@ -146,6 +139,17 @@ func fastcheckLuaRecord(L *lua.LState, n int) *luaRecord { return L.Get(n).(*lua.LUserData).Value.(*luaRecord) } +var luaRecordMethods = map[string]lua.LGFunction{ + "get": luaRecordGet, + "set": luaRecordSet, + "copy": luaRecordCopy, + "clear": luaRecordClear, +} + +type luaRecord struct { + r baker.Record +} + // record:get(int) returns string func luaRecordGet(L *lua.LState) int { luar := fastcheckLuaRecord(L, 1) @@ -167,3 +171,22 @@ func luaRecordSet(L *lua.LState) int { return 0 } + +// record:copy() record +func luaRecordCopy(L *lua.LState) int { + luar := fastcheckLuaRecord(L, 1) + + cpy := luar.r.Copy() + ud := recordToLua(L, cpy) + L.Push(ud) + + return 1 +} + +// record:clear() +func luaRecordClear(L *lua.LState) int { + luar := fastcheckLuaRecord(L, 1) + luar.r.Clear() + + return 0 +} diff --git a/filter/lua_test.go b/filter/lua_test.go index e4e58765..50acbce6 100644 --- a/filter/lua_test.go +++ b/filter/lua_test.go @@ -113,7 +113,23 @@ func TestLUAFilter(t *testing.T) { record: "ciao,,", want: [][3]string{ {"good", "", ""}, - }}, + }, + }, + { + name: "clearRecord", + record: "foo,bar,baz", + want: [][3]string{ + {"", "", ""}, + }, + }, + { + name: "copyRecord", + record: "foo,bar,baz", + want: [][3]string{ + {"foo", "bar", "1"}, + {"foo", "bar", "2"}, + }, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/filter/testdata/lua_test.lua b/filter/testdata/lua_test.lua index 30fa0c46..ae5d8253 100644 --- a/filter/testdata/lua_test.lua +++ b/filter/testdata/lua_test.lua @@ -32,3 +32,17 @@ function _validateRecord(rec, next) end next(rec) end + +function clearRecord(rec, next) + rec:clear() + next(rec) +end + +function copyRecord(rec, next) + rec:set(2, "1") + cpy = rec:copy() + next(rec) + + cpy:set(2, "2") + next(cpy) +end From 702495407462d5ec1765b7e62ec9eb598211ebb2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aur=C3=A9lien=20Rainone?= Date: Sat, 12 Dec 2020 20:45:54 +0100 Subject: [PATCH 07/21] filter: lua: check lua filter function exists --- filter/lua.go | 4 +++- filter/lua_test.go | 6 ++++++ 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/filter/lua.go b/filter/lua.go index ec5c6aea..7ec0a297 100644 --- a/filter/lua.go +++ b/filter/lua.go @@ -38,8 +38,10 @@ func NewLUA(cfg baker.FilterParams) (baker.Filter, error) { registerLUATypes(l, cfg.ComponentParams) - // TODO: check function exists luaFunc := l.GetGlobal(dcfg.FilterName) + if luaFunc.Type() == lua.LTNil { + return nil, fmt.Errorf("can't find lua filter %q in script %q", dcfg.FilterName, dcfg.Script) + } // Preallocate the userdata we use to wrap the record passed to the filter. ud := l.NewUserData() diff --git a/filter/lua_test.go b/filter/lua_test.go index 50acbce6..ab1aa9a2 100644 --- a/filter/lua_test.go +++ b/filter/lua_test.go @@ -130,6 +130,12 @@ func TestLUAFilter(t *testing.T) { {"foo", "bar", "2"}, }, }, + + // error cases + { + name: "doesNotExist", + wantErr: true, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { From 7a2fa435d8dd620fa2fed703e5bb8bb8874b5066 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aur=C3=A9lien=20Rainone?= Date: Sat, 12 Dec 2020 20:48:13 +0100 Subject: [PATCH 08/21] filter: lua: add help strings --- filter/lua.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/filter/lua.go b/filter/lua.go index 7ec0a297..4550c707 100644 --- a/filter/lua.go +++ b/filter/lua.go @@ -13,12 +13,12 @@ var LUADesc = baker.FilterDesc{ Name: "LUA", New: NewLUA, Config: &LUAConfig{}, - Help: `TBD`, + Help: `Run a baker filter defined in a lua script`, } type LUAConfig struct { - Script string - FilterName string + Script string `help:"Path to the lua script where the baker filter is defined" required:"true"` + FilterName string `help:"Name of the lua function to run as baker filter" required:"true"` } type LUA struct { From fff20180065e0f23862bcdc58bf9277e6aa3c95b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aur=C3=A9lien=20Rainone?= Date: Sat, 12 Dec 2020 20:57:11 +0100 Subject: [PATCH 09/21] filter: lua: cosmetics + docs --- filter/lua.go | 27 ++++++++++++++++----------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/filter/lua.go b/filter/lua.go index 4550c707..22c05d92 100644 --- a/filter/lua.go +++ b/filter/lua.go @@ -22,15 +22,16 @@ type LUAConfig struct { } type LUA struct { - l *lua.LState - ud *lua.LUserData - luaFunc lua.LValue - luaNext *lua.LFunction + l *lua.LState // lua state used during all the baker filter lifetime + ud *lua.LUserData // pre-allocated (reused) userdata for the processed record + luaFunc lua.LValue // lua filter function + luaNext *lua.LFunction // lua next function (reused) next func(baker.Record) } func NewLUA(cfg baker.FilterParams) (baker.Filter, error) { dcfg := cfg.DecodedConfig.(*LUAConfig) + l := lua.NewState() if err := l.DoFile(dcfg.Script); err != nil { return nil, fmt.Errorf("can't compile lua script %q: %v", dcfg.Script, err) @@ -44,21 +45,25 @@ func NewLUA(cfg baker.FilterParams) (baker.Filter, error) { } // Preallocate the userdata we use to wrap the record passed to the filter. + // We can do this since a single instance of a baker filter is only ever + // processing a single record at a time, so we can reuse the lua userdata + // structure for it. This reduces allocations. ud := l.NewUserData() l.SetMetatable(ud, l.GetTypeMetatable(luaRecordTypeName)) - f := &LUA{ - luaFunc: luaFunc, - l: l, - ud: ud, - } + f := &LUA{} - // Preallocate the lua next function passed to the filter - f.luaNext = l.NewFunction(func(L *lua.LState) int { + // Preallocate the lua next function passed to the filter. + luaNext := l.NewFunction(func(L *lua.LState) int { f.next(fastcheckLuaRecord(L, 1).r) return 0 }) + f.l = l + f.ud = ud + f.luaNext = luaNext + f.luaFunc = luaFunc + runtime.SetFinalizer(f, func(f *LUA) { f.l.Close() }) return f, nil From b9d204f44574f538ed520e4cb2d262cd0a199d16 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aur=C3=A9lien=20Rainone?= Date: Sun, 13 Dec 2020 22:31:18 +0100 Subject: [PATCH 10/21] filter: lua: fieldName and fieldByName --- filter/lua.go | 17 ++++++++++++++--- filter/lua_test.go | 11 +++++++++-- filter/testdata/lua_test.lua | 16 ++++++++++++---- 3 files changed, 35 insertions(+), 9 deletions(-) diff --git a/filter/lua.go b/filter/lua.go index 22c05d92..45f2eab3 100644 --- a/filter/lua.go +++ b/filter/lua.go @@ -87,10 +87,21 @@ func registerLUATypes(l *lua.LState, comp baker.ComponentParams) { return 2 })) - // Create the fields table. + l.SetGlobal("fieldByName", l.NewFunction(func(L *lua.LState) int { + fname := L.CheckString(1) + fidx, ok := comp.FieldByName(fname) + if !ok { + l.Push(lua.LNil) + } else { + l.Push(lua.LNumber(fidx)) + } + return 1 + })) + + // Create the fieldNaames table. fields := l.NewTable() - for i, n := range comp.FieldNames { - fields.RawSetString(n, lua.LNumber(i)) + for fidx, fname := range comp.FieldNames { + fields.RawSet(lua.LNumber(fidx), lua.LString(fname)) } l.SetGlobal("fieldNames", fields) } diff --git a/filter/lua_test.go b/filter/lua_test.go index ab1aa9a2..a8003bf6 100644 --- a/filter/lua_test.go +++ b/filter/lua_test.go @@ -87,19 +87,26 @@ func TestLUAFilter(t *testing.T) { want [][3]string // contains non discarded records with, for each of them, the 3 fields we want }{ { - name: "swapFieldsWithIndex", + name: "swapFields", record: "abc,def,ghi", want: [][3]string{ {"abc", "ghi", "def"}, }, }, { - name: "swapFieldsWithNames", + name: "_fieldByName", record: "abc,def,ghi", want: [][3]string{ {"abc", "ghi", "def"}, }, }, + { + name: "_fieldNames", + record: "abc,def,ghi", + want: [][3]string{ + {"foo", "bar", "baz"}, + }, + }, { name: "_createRecord", record: "abc,def,ghi", diff --git a/filter/testdata/lua_test.lua b/filter/testdata/lua_test.lua index ae5d8253..b1619852 100644 --- a/filter/testdata/lua_test.lua +++ b/filter/testdata/lua_test.lua @@ -1,4 +1,4 @@ -function swapFieldsWithIndex(rec, next) +function swapFields(rec, next) local f1, f2 f1 = rec:get(1) rec:set(1, rec:get(2)) @@ -6,14 +6,22 @@ function swapFieldsWithIndex(rec, next) next(rec) end -function swapFieldsWithNames(rec, next) +function _fieldByName(rec, next) local f1, f2 - f1 = rec:get(fieldNames["bar"]) - rec:set(1, rec:get(fieldNames["baz"])) + f1 = rec:get(fieldByName("bar")) + rec:set(1, rec:get(fieldByName("baz"))) rec:set(2, f1) next(rec) end +function _fieldNames(rec, next) + -- set each field to its name + rec:set(0, fieldNames[0]) + rec:set(1, fieldNames[1]) + rec:set(2, fieldNames[2]) + next(rec) +end + function _createRecord(rec, next) newrec = createRecord() newrec:set(0, "hey") From c637d7539e5056ba6b92c834e047130970c172c7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aur=C3=A9lien=20Rainone?= Date: Sun, 13 Dec 2020 22:44:30 +0100 Subject: [PATCH 11/21] Update CHANGELOG.md --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6d2017f8..561a229f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - Add the `Slice` filter [#175](https://github.com/AdRoll/baker/pull/175) +- Add LUA filter [#110](https://github.com/AdRoll/baker/pull/110) ### Changed From 01171024fdcd6e3ce7cf531546d267fc83362de2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aur=C3=A9lien=20Rainone?= Date: Sat, 26 Dec 2020 21:09:35 +0100 Subject: [PATCH 12/21] filter: lua: cosmetics --- filter/lua.go | 33 +++++++++++++++++++++++++++++++++ filter/lua_test.go | 2 +- filter/testdata/lua_test.lua | 6 +++--- 3 files changed, 37 insertions(+), 4 deletions(-) diff --git a/filter/lua.go b/filter/lua.go index 45f2eab3..f8c813fb 100644 --- a/filter/lua.go +++ b/filter/lua.go @@ -5,6 +5,7 @@ import ( "runtime" "github.com/AdRoll/baker" + lua "github.com/yuin/gopher-lua" ) @@ -16,11 +17,13 @@ var LUADesc = baker.FilterDesc{ Help: `Run a baker filter defined in a lua script`, } +// LUAConfig holds the configuration for the LUA filter. type LUAConfig struct { Script string `help:"Path to the lua script where the baker filter is defined" required:"true"` FilterName string `help:"Name of the lua function to run as baker filter" required:"true"` } +// LUA allows to run a baker filter from an external script written in lua. type LUA struct { l *lua.LState // lua state used during all the baker filter lifetime ud *lua.LUserData // pre-allocated (reused) userdata for the processed record @@ -29,6 +32,7 @@ type LUA struct { next func(baker.Record) } +// NewLUA returns a new LUA filter. func NewLUA(cfg baker.FilterParams) (baker.Filter, error) { dcfg := cfg.DecodedConfig.(*LUAConfig) @@ -69,9 +73,20 @@ func NewLUA(cfg baker.FilterParams) (baker.Filter, error) { return f, nil } +// registerLUATypes registers, in the given lua state, some lua types +// and utility functions useful to run a baker filter: +// - the record type +// - createRecord function (creates and returns a new record) +// - validateRecord function (takes a record and returns a boolean and a +// number), see baker.ComponentParams.ValidateRecord +// - fieldByName function (returns a field index given its name, or nil +// if the given field name doesn't exist) +// - fieldNames, an lua table where field names are indexed by their field +// field indexes func registerLUATypes(l *lua.LState, comp baker.ComponentParams) { registerLUARecordType(l) + // Registers the 'createRecord' lua function. l.SetGlobal("createRecord", l.NewFunction(func(L *lua.LState) int { rec := comp.CreateRecord() ud := recordToLua(l, rec) @@ -106,8 +121,14 @@ func registerLUATypes(l *lua.LState, comp baker.ComponentParams) { l.SetGlobal("fieldNames", fields) } +// TODO: at the moment LUA filter doesn't publish stats. +// There are multiple ways to do it, either require the filter to update +// the numbers of processed and filtered records, or deduce them automatically +// by hooking into next and Process functions (if that proves too costly +// this could be disabled in configuration. func (t *LUA) Stats() baker.FilterStats { return baker.FilterStats{} } +// Process forwards records to the lua-written filter. func (t *LUA) Process(rec baker.Record, next func(baker.Record)) { // Modify the record inside the pre-allocated user value t.ud.Value = &luaRecord{r: rec} @@ -122,6 +143,8 @@ func (t *LUA) Process(rec baker.Record, next func(baker.Record)) { Protect: true, }, t.ud, t.luaNext) + // TODO: should not panic here and instead increment a filter-specific + // metric that tracks the number of lua runtime errors. if err != nil { panic(err) } @@ -131,12 +154,15 @@ func (t *LUA) Process(rec baker.Record, next func(baker.Record)) { const luaRecordTypeName = "record" +// registers the 'record' type into the given lua state. func registerLUARecordType(L *lua.LState) { mt := L.NewTypeMetatable(luaRecordTypeName) L.SetGlobal(luaRecordTypeName, mt) L.SetField(mt, "__index", L.SetFuncs(L.NewTable(), luaRecordMethods)) } +// converts a baker.Record to lua user data, suitable to be pushed onto +// an lua stack. func recordToLua(L *lua.LState, r baker.Record) *lua.LUserData { ud := L.NewUserData() ud.Value = &luaRecord{r: r} @@ -144,6 +170,9 @@ func recordToLua(L *lua.LState, r baker.Record) *lua.LUserData { return ud } +// checks that the element at current stack index n is an lua +// user data, holding an luaRecord, and returns it. Raises an lua +// runtime error if the element is not a record. func checkLuaRecord(L *lua.LState, n int) *luaRecord { ud := L.CheckUserData(n) if v, ok := ud.Value.(*luaRecord); ok { @@ -153,10 +182,13 @@ func checkLuaRecord(L *lua.LState, n int) *luaRecord { return nil } +// faster version of checkLuaRecord that panics if the stack element +// is not a record, rather than raising an lua runtime error. func fastcheckLuaRecord(L *lua.LState, n int) *luaRecord { return L.Get(n).(*lua.LUserData).Value.(*luaRecord) } +// holds the lua-bound methods of baker.Record. var luaRecordMethods = map[string]lua.LGFunction{ "get": luaRecordGet, "set": luaRecordSet, @@ -164,6 +196,7 @@ var luaRecordMethods = map[string]lua.LGFunction{ "clear": luaRecordClear, } +// lua wrapper over a baker Record. type luaRecord struct { r baker.Record } diff --git a/filter/lua_test.go b/filter/lua_test.go index a8003bf6..e65c4360 100644 --- a/filter/lua_test.go +++ b/filter/lua_test.go @@ -25,7 +25,7 @@ end if err != nil { b.Fatal(err) } - // fname := filepath.Join(b.TempDir(), "filters.lua") + fname := filepath.Join(dir, "filters.lua") if err := ioutil.WriteFile(fname, []byte(script), os.ModePerm); err != nil { b.Fatalf("can't write lua script: %v", err) diff --git a/filter/testdata/lua_test.lua b/filter/testdata/lua_test.lua index b1619852..e39c9439 100644 --- a/filter/testdata/lua_test.lua +++ b/filter/testdata/lua_test.lua @@ -1,7 +1,7 @@ +-- use record:get and record:set to swap 2 fields function swapFields(rec, next) - local f1, f2 - f1 = rec:get(1) - rec:set(1, rec:get(2)) + local f1, f2 = rec:get(1), rec:get(2) + rec:set(1, f2) rec:set(2, f1) next(rec) end From f6bd7292030acf9d51e0a286a8f40951244a3db2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aur=C3=A9lien=20Rainone?= Date: Wed, 3 Mar 2021 19:37:13 +0100 Subject: [PATCH 13/21] filter: lua: improve filter documentation --- filter/lua.go | 101 +++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 100 insertions(+), 1 deletion(-) diff --git a/filter/lua.go b/filter/lua.go index f8c813fb..0b55988e 100644 --- a/filter/lua.go +++ b/filter/lua.go @@ -9,12 +9,111 @@ import ( lua "github.com/yuin/gopher-lua" ) +var luaHelp = ` +This filter runs a baker filter defined in a LUA script. +It's useful to quickly write and run a Baker filter without having to recompile Baker. + +This filter is based on [GopherLua](github.com/yuin/gopher-lua), which is an LUA5.1 virtual machine. + +To use this filter you need to declare a function in an lua file. This function serves the same purpose +of the equivalent ` + ticks("baker.Filter.Process") + ` method one would write for a Go filter. + +This is a simple filter which writes "hey" to the field "f0" of every record: +` + startBlock("lua") + ` +-- rec is a record object to be processed +-- next is the function next(record) to forward, if you want to, the record into the filter chain. +function myFilter(rec, next) + rec:set(fieldNames["f0"], "hey") + next(rec) +end +` + endBlock() + ` + +### The ` + ticks("Record") + ` table + +The first argument received by your lua filter function is a Record table. The Record table is an surrogate +for its Go counterpart, ` + ticks("baker.Record") + `. The Lua Record table defines the following methods: + +#### get + +- Go: ` + ticks("Record.Get(FieldIndex) []byte") + ` +- lua: ` + ticks("record:get(idx) -> string") + ` + +Same as its Go counterpart ` + ticks("record::get") + ` takes a integer representing the field index and returns its value. +But, unlike in Go where the value is a ` + ticks("[]byte]") + `, in lua it's returned as a string, to allow for fast protoyping. + +#### "set" + +- Go: ` + ticks("Record.Set(FieldIndex, []byte)") + ` +- lua: ` + ticks("record:set(idx, string)") + ` + +Same as its Go counterpart ` + ticks("record::set") + ` takes a integer representing the field index and the value to set. +But, unlike in Go where the value is a ` + ticks("[]byte]") + `, in lua it's a string, to allow for fast protoyping. + +#### "copy" + +- Go: ` + ticks("Record.Copy() Record") + ` +- lua: ` + ticks("record:copy() -> record") + ` + +Calling ` + ticks("record::copy") + ` returns a new record, a deep-copy of the original. + +#### "clear" + +- Go: ` + ticks("Record.Clear()") + ` +- lua: ` + ticks("lua: record:clear()") + ` + +Calling ` + ticks("record::clear") + ` clears the records internal state, making all its fields empty. + + +### Global functions + +#### createRecord + +- Go: ` + ticks("Components.CreateRecord() Record") + ` +- lua: ` + ticks("createRecord -> record") + ` + +` + ticks("createRecord") + ` is the lua equivalent of the ` + ticks("CreateRecord") + ` function passed to your filter during construction. +It allows to create a new Record instance. + +#### validateRecord + +- Go: ` + ticks("Components.ValidateRecord(Record) (bool, FieldIndex)") + ` +- lua: ` + ticks("validateRecord(record) -> (bool, int)") + ` + +` + ticks("validateRecord") + ` is the lua equivalent of the ` + ticks("ValidateRecord") + ` function passed to your filter during construction. +It validates a given record with respect to the validation function, returning a boolean indicating whether +the record is a valid one, if false, the returned integer indicates the index of the first invalid field it met. + +#### fieldByName + +- Go: ` + ticks("Components.FieldByName(string) (FieldIndex, bool)") + ` +- lua: ` + ticks("fieldByName(string) -> int|nil") + ` + +` + ticks("fieldByName") + ` is the lua equivalent of the ` + ticks("FieldByName") + ` function passed to your filter during construction. +It allows to lookup a field index by its name, returning the index or nil if no field exists with this index. + +### Global tables + +#### fieldNames + +- Go: ` + ticks("Components.FieldNames []string") + ` +- lua: ` + ticks("fieldNames") + ` + +` + ticks("fieldNames") + ` is an integed-indexed table, in other words an array, containing all field names, as ` + ticks("FieldNames") + ` in Go. +` + +// TODO(arl): ideally this functions should not be required, but writing +// markdown documentation in Go strings is tedious and error-prone. + +func ticks(s string) string { return "`" + s + "`" } +func startBlock(lang string) string { return "```" + lang } +func endBlock() string { return "```" } + // LUADesc describes the LUA filter var LUADesc = baker.FilterDesc{ Name: "LUA", New: NewLUA, Config: &LUAConfig{}, - Help: `Run a baker filter defined in a lua script`, + Help: luaHelp, } // LUAConfig holds the configuration for the LUA filter. From 3e54ab7f7aad894c2490aae33b19ba78dc7e768a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aur=C3=A9lien=20Rainone?= Date: Thu, 4 Mar 2021 15:00:52 +0100 Subject: [PATCH 14/21] filter: lua: remove limit on procs=1 --- filter/lua.go | 61 ++++++++++++++++++++++++++------------------------- 1 file changed, 31 insertions(+), 30 deletions(-) diff --git a/filter/lua.go b/filter/lua.go index 0b55988e..2ff598f4 100644 --- a/filter/lua.go +++ b/filter/lua.go @@ -3,6 +3,7 @@ package filter import ( "fmt" "runtime" + "sync" "github.com/AdRoll/baker" @@ -124,11 +125,11 @@ type LUAConfig struct { // LUA allows to run a baker filter from an external script written in lua. type LUA struct { - l *lua.LState // lua state used during all the baker filter lifetime - ud *lua.LUserData // pre-allocated (reused) userdata for the processed record - luaFunc lua.LValue // lua filter function - luaNext *lua.LFunction // lua next function (reused) - next func(baker.Record) + mu sync.Mutex + l *lua.LState // lua state used during all the baker filter lifetime + + luaFunc lua.LValue // lua filter function + recordMt lua.LValue } // NewLUA returns a new LUA filter. @@ -147,26 +148,15 @@ func NewLUA(cfg baker.FilterParams) (baker.Filter, error) { return nil, fmt.Errorf("can't find lua filter %q in script %q", dcfg.FilterName, dcfg.Script) } - // Preallocate the userdata we use to wrap the record passed to the filter. - // We can do this since a single instance of a baker filter is only ever - // processing a single record at a time, so we can reuse the lua userdata - // structure for it. This reduces allocations. - ud := l.NewUserData() - l.SetMetatable(ud, l.GetTypeMetatable(luaRecordTypeName)) - - f := &LUA{} - - // Preallocate the lua next function passed to the filter. - luaNext := l.NewFunction(func(L *lua.LState) int { - f.next(fastcheckLuaRecord(L, 1).r) - return 0 - }) - - f.l = l - f.ud = ud - f.luaNext = luaNext - f.luaFunc = luaFunc + f := &LUA{ + l: l, + recordMt: l.GetTypeMetatable(luaRecordTypeName), + luaFunc: luaFunc, + } + // Since a filter has no way to know when it's deallocated we set a + // finaliser on the lua state instance, which gives us the occasion to close + // it. runtime.SetFinalizer(f, func(f *LUA) { f.l.Close() }) return f, nil @@ -229,18 +219,27 @@ func (t *LUA) Stats() baker.FilterStats { return baker.FilterStats{} } // Process forwards records to the lua-written filter. func (t *LUA) Process(rec baker.Record, next func(baker.Record)) { - // Modify the record inside the pre-allocated user value - t.ud.Value = &luaRecord{r: rec} - // Set the next function which is called by the lua filter to the one - // we just received. - t.next = next + t.mu.Lock() + defer t.mu.Unlock() + + nextCalled := false + luaNext := t.l.NewFunction(func(L *lua.LState) int { + next(fastcheckLuaRecord(L, 1).r) + nextCalled = true + return 0 + }) + + // Wrap the incoming record into an lua user data having the 'record' type meta-table. + ud := t.l.NewUserData() + t.l.SetMetatable(ud, t.recordMt) + ud.Value = &luaRecord{r: rec} err := t.l.CallByParam(lua.P{ Fn: t.luaFunc, NRet: 0, Protect: true, - }, t.ud, t.luaNext) + }, ud, luaNext) // TODO: should not panic here and instead increment a filter-specific // metric that tracks the number of lua runtime errors. @@ -265,6 +264,8 @@ func registerLUARecordType(L *lua.LState) { func recordToLua(L *lua.LState, r baker.Record) *lua.LUserData { ud := L.NewUserData() ud.Value = &luaRecord{r: r} + // TODO(arl) record type metatable likely never changes so we could avoid + // the extra lookup here L.SetMetatable(ud, L.GetTypeMetatable(luaRecordTypeName)) return ud } From 19e428611720de5716656bd959ad7a3e0d7bfbb0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aur=C3=A9lien=20Rainone?= Date: Thu, 4 Mar 2021 15:01:48 +0100 Subject: [PATCH 15/21] output: lua: implement Stats() method --- filter/lua.go | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/filter/lua.go b/filter/lua.go index 2ff598f4..4fbd8faa 100644 --- a/filter/lua.go +++ b/filter/lua.go @@ -4,6 +4,7 @@ import ( "fmt" "runtime" "sync" + "sync/atomic" "github.com/AdRoll/baker" @@ -130,6 +131,8 @@ type LUA struct { luaFunc lua.LValue // lua filter function recordMt lua.LValue + + nprocessed, nfiltered int64 } // NewLUA returns a new LUA filter. @@ -210,15 +213,16 @@ func registerLUATypes(l *lua.LState, comp baker.ComponentParams) { l.SetGlobal("fieldNames", fields) } -// TODO: at the moment LUA filter doesn't publish stats. -// There are multiple ways to do it, either require the filter to update -// the numbers of processed and filtered records, or deduce them automatically -// by hooking into next and Process functions (if that proves too costly -// this could be disabled in configuration. -func (t *LUA) Stats() baker.FilterStats { return baker.FilterStats{} } +func (t *LUA) Stats() baker.FilterStats { + return baker.FilterStats{ + NumProcessedLines: atomic.LoadInt64(&t.nprocessed), + NumFilteredLines: atomic.LoadInt64(&t.nfiltered), + } +} // Process forwards records to the lua-written filter. func (t *LUA) Process(rec baker.Record, next func(baker.Record)) { + atomic.AddInt64(&t.nprocessed, 1) t.mu.Lock() defer t.mu.Unlock() @@ -246,6 +250,10 @@ func (t *LUA) Process(rec baker.Record, next func(baker.Record)) { if err != nil { panic(err) } + + if !nextCalled { + atomic.AddInt64(&t.nfiltered, 1) + } } // lua record methods From 7b3fd953a6c3129db3fdab1ed86534d81dd5e2ff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aur=C3=A9lien=20Rainone?= Date: Thu, 4 Mar 2021 15:03:55 +0100 Subject: [PATCH 16/21] Cosmetics --- filter/lua.go | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/filter/lua.go b/filter/lua.go index 4fbd8faa..4ac91dff 100644 --- a/filter/lua.go +++ b/filter/lua.go @@ -129,10 +129,12 @@ type LUA struct { mu sync.Mutex l *lua.LState // lua state used during all the baker filter lifetime - luaFunc lua.LValue // lua filter function - recordMt lua.LValue + // These do not need protection against concurrent calls as they're strictly + // read-only. + luaProcess lua.LValue // lua filter function + recordMt lua.LValue // lua record type meta table - nprocessed, nfiltered int64 + nprocessed, nfiltered int64 // for filter stats } // NewLUA returns a new LUA filter. @@ -152,9 +154,9 @@ func NewLUA(cfg baker.FilterParams) (baker.Filter, error) { } f := &LUA{ - l: l, - recordMt: l.GetTypeMetatable(luaRecordTypeName), - luaFunc: luaFunc, + l: l, + recordMt: l.GetTypeMetatable(luaRecordTypeName), + luaProcess: luaFunc, } // Since a filter has no way to know when it's deallocated we set a @@ -240,7 +242,7 @@ func (t *LUA) Process(rec baker.Record, next func(baker.Record)) { ud.Value = &luaRecord{r: rec} err := t.l.CallByParam(lua.P{ - Fn: t.luaFunc, + Fn: t.luaProcess, NRet: 0, Protect: true, }, ud, luaNext) From 73224ef6d4d547c3f3b8a3cd83fdf73939629d3b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aur=C3=A9lien=20Rainone?= Date: Thu, 4 Mar 2021 16:27:27 +0100 Subject: [PATCH 17/21] filter: lua: address cosmetic review comments --- filter/lua.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/filter/lua.go b/filter/lua.go index 4ac91dff..bf78e20f 100644 --- a/filter/lua.go +++ b/filter/lua.go @@ -61,7 +61,7 @@ Calling ` + ticks("record::copy") + ` returns a new record, a deep-copy of the o #### "clear" - Go: ` + ticks("Record.Clear()") + ` -- lua: ` + ticks("lua: record:clear()") + ` +- lua: ` + ticks("record:clear()") + ` Calling ` + ticks("record::clear") + ` clears the records internal state, making all its fields empty. @@ -71,7 +71,7 @@ Calling ` + ticks("record::clear") + ` clears the records internal state, making #### createRecord - Go: ` + ticks("Components.CreateRecord() Record") + ` -- lua: ` + ticks("createRecord -> record") + ` +- lua: ` + ticks("createRecord() -> record") + ` ` + ticks("createRecord") + ` is the lua equivalent of the ` + ticks("CreateRecord") + ` function passed to your filter during construction. It allows to create a new Record instance. @@ -100,7 +100,7 @@ It allows to lookup a field index by its name, returning the index or nil if no - Go: ` + ticks("Components.FieldNames []string") + ` - lua: ` + ticks("fieldNames") + ` -` + ticks("fieldNames") + ` is an integed-indexed table, in other words an array, containing all field names, as ` + ticks("FieldNames") + ` in Go. +` + ticks("fieldNames") + ` is an integer-indexed table, in other words an array, containing all field names, as ` + ticks("FieldNames") + ` in Go. ` // TODO(arl): ideally this functions should not be required, but writing @@ -150,7 +150,7 @@ func NewLUA(cfg baker.FilterParams) (baker.Filter, error) { luaFunc := l.GetGlobal(dcfg.FilterName) if luaFunc.Type() == lua.LTNil { - return nil, fmt.Errorf("can't find lua filter %q in script %q", dcfg.FilterName, dcfg.Script) + return nil, fmt.Errorf("can't find lua filter function name %q in script %q", dcfg.FilterName, dcfg.Script) } f := &LUA{ From ab5f2874245b82f81ac0c5008a862bac783627e8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aur=C3=A9lien=20Rainone?= Date: Fri, 29 Oct 2021 13:09:16 +0200 Subject: [PATCH 18/21] filter: remove NumProcessedLines --- filter/lua.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/filter/lua.go b/filter/lua.go index bf78e20f..79faf500 100644 --- a/filter/lua.go +++ b/filter/lua.go @@ -217,8 +217,7 @@ func registerLUATypes(l *lua.LState, comp baker.ComponentParams) { func (t *LUA) Stats() baker.FilterStats { return baker.FilterStats{ - NumProcessedLines: atomic.LoadInt64(&t.nprocessed), - NumFilteredLines: atomic.LoadInt64(&t.nfiltered), + NumFilteredLines: atomic.LoadInt64(&t.nfiltered), } } From 1ebd394b8599c8e3dbf51de06dc2563921d733d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aur=C3=A9lien=20Rainone?= Date: Mon, 15 Nov 2021 18:55:23 +0100 Subject: [PATCH 19/21] bump lua --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index fe61d1be..b0d7ac42 100644 --- a/go.mod +++ b/go.mod @@ -22,7 +22,7 @@ require ( github.com/valyala/gozstd v1.9.0 github.com/vmware/vmware-go-kcl v0.0.0-20210126043010-022ec8d9de8f github.com/yuin/goldmark v1.2.1 // indirect - github.com/yuin/gopher-lua v0.0.0-20200816102855-ee81675732da + github.com/yuin/gopher-lua v0.0.0-20210529063254-f4c35e4016d9 golang.org/x/net v0.0.0-20201021035429-f5854403a974 golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4 // indirect ) diff --git a/go.sum b/go.sum index e37c7599..4941f896 100644 --- a/go.sum +++ b/go.sum @@ -151,8 +151,8 @@ github.com/vmware/vmware-go-kcl v0.0.0-20210126043010-022ec8d9de8f/go.mod h1:a5S github.com/yuin/goldmark v1.2.0/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1 h1:ruQGxdhGHe7FWOJPT0mKs5+pD2Xs1Bm/kdGlHO04FmM= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= -github.com/yuin/gopher-lua v0.0.0-20200816102855-ee81675732da h1:NimzV1aGyq29m5ukMK0AMWEhFaL/lrEOaephfuoiARg= -github.com/yuin/gopher-lua v0.0.0-20200816102855-ee81675732da/go.mod h1:E1AXubJBdNmFERAOucpDIxNzeGfLzg0mYh+UfMWdChA= +github.com/yuin/gopher-lua v0.0.0-20210529063254-f4c35e4016d9 h1:k/gmLsJDWwWqbLCur2yWnJzwQEKRcAHXo6seXGuSwWw= +github.com/yuin/gopher-lua v0.0.0-20210529063254-f4c35e4016d9/go.mod h1:E1AXubJBdNmFERAOucpDIxNzeGfLzg0mYh+UfMWdChA= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/multierr v1.2.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/zap v1.11.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= From 4fe56235840b8e92e516daf4e9636f2a8e7ed8f6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aur=C3=A9lien=20Rainone?= Date: Mon, 15 Nov 2021 19:40:15 +0100 Subject: [PATCH 20/21] filter: lua: simplify process api Lua function just receives a lua Record, and must return 2 values: - the first is a boolean indicating whether is to be kept or discarded - the second can be nil or a string indicating the error for this record --- filter/lua.go | 60 +++++++++++++----------------------- filter/testdata/lua_test.lua | 52 +++++++++---------------------- 2 files changed, 36 insertions(+), 76 deletions(-) diff --git a/filter/lua.go b/filter/lua.go index 79faf500..ed5fdc98 100644 --- a/filter/lua.go +++ b/filter/lua.go @@ -2,6 +2,7 @@ package filter import ( "fmt" + "log" "runtime" "sync" "sync/atomic" @@ -126,6 +127,8 @@ type LUAConfig struct { // LUA allows to run a baker filter from an external script written in lua. type LUA struct { + nfiltered int64 // for filter stats + mu sync.Mutex l *lua.LState // lua state used during all the baker filter lifetime @@ -134,7 +137,6 @@ type LUA struct { luaProcess lua.LValue // lua filter function recordMt lua.LValue // lua record type meta table - nprocessed, nfiltered int64 // for filter stats } // NewLUA returns a new LUA filter. @@ -180,14 +182,6 @@ func NewLUA(cfg baker.FilterParams) (baker.Filter, error) { func registerLUATypes(l *lua.LState, comp baker.ComponentParams) { registerLUARecordType(l) - // Registers the 'createRecord' lua function. - l.SetGlobal("createRecord", l.NewFunction(func(L *lua.LState) int { - rec := comp.CreateRecord() - ud := recordToLua(l, rec) - L.Push(ud) - return 1 - })) - l.SetGlobal("validateRecord", l.NewFunction(func(L *lua.LState) int { luar := fastcheckLuaRecord(l, 1) ok, fidx := comp.ValidateRecord(luar.r) @@ -221,40 +215,42 @@ func (t *LUA) Stats() baker.FilterStats { } } +// TODO(arl) Look at https://github.com/yuin/gopher-lua#the-lstate-pool-pattern + // Process forwards records to the lua-written filter. func (t *LUA) Process(rec baker.Record, next func(baker.Record)) { - atomic.AddInt64(&t.nprocessed, 1) - - t.mu.Lock() - defer t.mu.Unlock() - - nextCalled := false - luaNext := t.l.NewFunction(func(L *lua.LState) int { - next(fastcheckLuaRecord(L, 1).r) - nextCalled = true - return 0 - }) - // Wrap the incoming record into an lua user data having the 'record' type meta-table. + t.mu.Lock() ud := t.l.NewUserData() t.l.SetMetatable(ud, t.recordMt) ud.Value = &luaRecord{r: rec} err := t.l.CallByParam(lua.P{ Fn: t.luaProcess, - NRet: 0, + NRet: 2, Protect: true, - }, ud, luaNext) + }, ud) + t.mu.Unlock() - // TODO: should not panic here and instead increment a filter-specific - // metric that tracks the number of lua runtime errors. if err != nil { + // this should cause panic at runtime panic(err) } - if !nextCalled { + keep := t.l.Get(-2).(lua.LBool) + luaErr := t.l.Get(-1) + t.l.Pop(2) + if luaErr != lua.LNil { + log.Printf("error from lua function: %v", luaErr.(lua.LString)) + return + } + + if !keep { atomic.AddInt64(&t.nfiltered, 1) + return } + + next(rec) } // lua record methods @@ -301,7 +297,6 @@ func fastcheckLuaRecord(L *lua.LState, n int) *luaRecord { var luaRecordMethods = map[string]lua.LGFunction{ "get": luaRecordGet, "set": luaRecordSet, - "copy": luaRecordCopy, "clear": luaRecordClear, } @@ -332,17 +327,6 @@ func luaRecordSet(L *lua.LState) int { return 0 } -// record:copy() record -func luaRecordCopy(L *lua.LState) int { - luar := fastcheckLuaRecord(L, 1) - - cpy := luar.r.Copy() - ud := recordToLua(L, cpy) - L.Push(ud) - - return 1 -} - // record:clear() func luaRecordClear(L *lua.LState) int { luar := fastcheckLuaRecord(L, 1) diff --git a/filter/testdata/lua_test.lua b/filter/testdata/lua_test.lua index e39c9439..6e42bd29 100644 --- a/filter/testdata/lua_test.lua +++ b/filter/testdata/lua_test.lua @@ -1,56 +1,32 @@ --- use record:get and record:set to swap 2 fields -function swapFields(rec, next) +function swapFields(rec) local f1, f2 = rec:get(1), rec:get(2) rec:set(1, f2) rec:set(2, f1) - next(rec) + return true, nil end -function _fieldByName(rec, next) +function errorFromLua(rec) + return false, "error from lua" +end + + +function _fieldByName(rec) local f1, f2 f1 = rec:get(fieldByName("bar")) rec:set(1, rec:get(fieldByName("baz"))) rec:set(2, f1) - next(rec) + return true, nil end -function _fieldNames(rec, next) +function _fieldNames(rec) -- set each field to its name rec:set(0, fieldNames[0]) rec:set(1, fieldNames[1]) rec:set(2, fieldNames[2]) - next(rec) + return true, nil end -function _createRecord(rec, next) - newrec = createRecord() - newrec:set(0, "hey") - newrec:set(1, "ho") - newrec:set(2, "let's go!") - next(newrec) - next(rec) -end - -function _validateRecord(rec, next) - ok, idx = validateRecord(rec) - if ok == false and idx == 0 then - rec:set(0, "good") - else - rec:set(0, "bad") - end - next(rec) -end - -function clearRecord(rec, next) +function clearRecord(rec) rec:clear() - next(rec) -end - -function copyRecord(rec, next) - rec:set(2, "1") - cpy = rec:copy() - next(rec) - - cpy:set(2, "2") - next(cpy) -end + return true, nil +end \ No newline at end of file From 341f08ba00f08f5a5ccac79a2aaf9b07f1eccbc6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aur=C3=A9lien=20Rainone?= Date: Mon, 15 Nov 2021 20:01:12 +0100 Subject: [PATCH 21/21] filter: lua: fix tests --- filter/lua_test.go | 23 ----------------------- 1 file changed, 23 deletions(-) diff --git a/filter/lua_test.go b/filter/lua_test.go index e65c4360..8cbbde0c 100644 --- a/filter/lua_test.go +++ b/filter/lua_test.go @@ -107,21 +107,6 @@ func TestLUAFilter(t *testing.T) { {"foo", "bar", "baz"}, }, }, - { - name: "_createRecord", - record: "abc,def,ghi", - want: [][3]string{ - {"hey", "ho", "let's go!"}, - {"abc", "def", "ghi"}, - }, - }, - { - name: "_validateRecord", - record: "ciao,,", - want: [][3]string{ - {"good", "", ""}, - }, - }, { name: "clearRecord", record: "foo,bar,baz", @@ -129,14 +114,6 @@ func TestLUAFilter(t *testing.T) { {"", "", ""}, }, }, - { - name: "copyRecord", - record: "foo,bar,baz", - want: [][3]string{ - {"foo", "bar", "1"}, - {"foo", "bar", "2"}, - }, - }, // error cases {