Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions plugin/driver/driver_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,9 @@ func (c *PluginDriverInterface) TimeOutCommit() (LastSuccessCommitData *PluginDa
func (c *PluginDriverInterface) Skip(SkipData *PluginDataType) error {
return nil
}

// Filter 是可选过滤钩子,插件可按需实现字段动态增删等逻辑。
// keep=false 表示过滤掉当前事件;keep=true 且返回非nil data 表示继续同步。
func (c *PluginDriverInterface) Filter(data *PluginDataType, retry bool) (newData *PluginDataType, keep bool, err error) {
return data, true, nil
}
9 changes: 9 additions & 0 deletions plugin/driver/filter_interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package driver

// FilterPluginDriver 是可选接口。
// 插件实现后可在写入目标端前对事件做动态字段增删、改写或过滤。
type FilterPluginDriver interface {
// keep=false 表示忽略当前事件;keep=true 表示继续同步。
// 当 keep=true 时,newData 不能为空。
Filter(data *PluginDataType, retry bool) (newData *PluginDataType, keep bool, err error)
}
28 changes: 28 additions & 0 deletions server/to_server_consume.go
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,27 @@ func (This *ToServer) getPluginAndSetParam(MyConsumerId int) (PluginConn *plugin
return
}

func (This *ToServer) applyPluginFilter(conn pluginDriver.Driver, data *pluginDriver.PluginDataType, retry bool) (newData *pluginDriver.PluginDataType, keep bool, err error) {
if data == nil {
return data, true, nil
}
filterPlugin, ok := conn.(pluginDriver.FilterPluginDriver)
if !ok {
return data, true, nil
}
newData, keep, err = filterPlugin.Filter(data, retry)
if err != nil {
return data, keep, err
}
if keep == false {
return data, false, nil
}
if newData == nil {
return data, false, fmt.Errorf("filter plugin return nil data while keep=true")
}
return newData, true, nil
}

func (This *ToServer) timeOutCommit(MyConsumerId int) (LastSuccessCommitData *pluginDriver.PluginDataType, ErrData *pluginDriver.PluginDataType, err error) {
defer func() {
if err2 := recover(); err2 != nil {
Expand Down Expand Up @@ -644,6 +665,13 @@ func (This *ToServer) sendToServer(paramData *pluginDriver.PluginDataType, MyCon
return lastSuccessCommitData, data, err
}
defer plugin.BackPlugin(PluginConn)
data, b, err = This.applyPluginFilter(PluginConn.GetConn(), data, retry)
if err != nil {
return lastSuccessCommitData, data, err
}
if b == false {
return paramData, nil, nil
}

switch data.EventType {
case "insert":
Expand Down
184 changes: 184 additions & 0 deletions server/to_server_consume_filter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
package server

import (
"errors"
pluginDriver "github.com/brokercap/Bifrost/plugin/driver"
"testing"
)

type mockBasePluginDriver struct{}

func (m *mockBasePluginDriver) SetOption(uri *string, param map[string]interface{}) {}

func (m *mockBasePluginDriver) Open() error {
return nil
}

func (m *mockBasePluginDriver) Close() bool {
return true
}

func (m *mockBasePluginDriver) GetUriExample() string {
return ""
}

func (m *mockBasePluginDriver) CheckUri() error {
return nil
}

func (m *mockBasePluginDriver) Insert(data *pluginDriver.PluginDataType, retry bool) (*pluginDriver.PluginDataType, *pluginDriver.PluginDataType, error) {
return data, nil, nil
}

func (m *mockBasePluginDriver) Update(data *pluginDriver.PluginDataType, retry bool) (*pluginDriver.PluginDataType, *pluginDriver.PluginDataType, error) {
return data, nil, nil
}

func (m *mockBasePluginDriver) Del(data *pluginDriver.PluginDataType, retry bool) (*pluginDriver.PluginDataType, *pluginDriver.PluginDataType, error) {
return data, nil, nil
}

func (m *mockBasePluginDriver) Query(data *pluginDriver.PluginDataType, retry bool) (*pluginDriver.PluginDataType, *pluginDriver.PluginDataType, error) {
return data, nil, nil
}

func (m *mockBasePluginDriver) Commit(data *pluginDriver.PluginDataType, retry bool) (*pluginDriver.PluginDataType, *pluginDriver.PluginDataType, error) {
return data, nil, nil
}

func (m *mockBasePluginDriver) SetParam(p interface{}) (interface{}, error) {
return p, nil
}

func (m *mockBasePluginDriver) TimeOutCommit() (*pluginDriver.PluginDataType, *pluginDriver.PluginDataType, error) {
return nil, nil, nil
}

func (m *mockBasePluginDriver) Skip(data *pluginDriver.PluginDataType) error {
return nil
}

type mockFilterPluginDriver struct {
mockBasePluginDriver
filterFn func(data *pluginDriver.PluginDataType, retry bool) (*pluginDriver.PluginDataType, bool, error)
}

func (m *mockFilterPluginDriver) Filter(data *pluginDriver.PluginDataType, retry bool) (*pluginDriver.PluginDataType, bool, error) {
return m.filterFn(data, retry)
}

func TestToServerApplyPluginFilterPassThrough(t *testing.T) {
s := &ToServer{}
input := &pluginDriver.PluginDataType{
EventType: "insert",
Rows: []map[string]interface{}{
{
"id": 1,
"name": "alice",
},
},
}
output, keep, err := s.applyPluginFilter(&mockBasePluginDriver{}, input, false)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if !keep {
t.Fatalf("expected keep=true")
}
if output != input {
t.Fatalf("expected same data pointer for pass-through")
}
}

func TestToServerApplyPluginFilterDynamicFields(t *testing.T) {
s := &ToServer{}
input := &pluginDriver.PluginDataType{
EventType: "insert",
Rows: []map[string]interface{}{
{
"id": 1,
"name": "alice",
"drop_field": "x",
},
},
}
driver := &mockFilterPluginDriver{
filterFn: func(data *pluginDriver.PluginDataType, retry bool) (*pluginDriver.PluginDataType, bool, error) {
row := map[string]interface{}{
"id": data.Rows[0]["id"],
"name": data.Rows[0]["name"],
"new_field": "from-filter",
}
newData := *data
newData.Rows = []map[string]interface{}{row}
return &newData, true, nil
},
}
output, keep, err := s.applyPluginFilter(driver, input, false)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if !keep {
t.Fatalf("expected keep=true")
}
if _, ok := output.Rows[0]["drop_field"]; ok {
t.Fatalf("drop_field should be removed by filter plugin")
}
if output.Rows[0]["new_field"] != "from-filter" {
t.Fatalf("new_field not added by filter plugin")
}
}

func TestToServerApplyPluginFilterDropEvent(t *testing.T) {
s := &ToServer{}
input := &pluginDriver.PluginDataType{
EventType: "insert",
Rows: []map[string]interface{}{{"id": 1}},
}
driver := &mockFilterPluginDriver{
filterFn: func(data *pluginDriver.PluginDataType, retry bool) (*pluginDriver.PluginDataType, bool, error) {
return nil, false, nil
},
}
_, keep, err := s.applyPluginFilter(driver, input, false)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if keep {
t.Fatalf("expected keep=false")
}
}

func TestToServerApplyPluginFilterKeepTrueButNilData(t *testing.T) {
s := &ToServer{}
input := &pluginDriver.PluginDataType{
EventType: "insert",
Rows: []map[string]interface{}{{"id": 1}},
}
driver := &mockFilterPluginDriver{
filterFn: func(data *pluginDriver.PluginDataType, retry bool) (*pluginDriver.PluginDataType, bool, error) {
return nil, true, nil
},
}
_, _, err := s.applyPluginFilter(driver, input, false)
if err == nil {
t.Fatalf("expected error when keep=true but data=nil")
}
}

func TestToServerApplyPluginFilterReturnError(t *testing.T) {
s := &ToServer{}
input := &pluginDriver.PluginDataType{
EventType: "insert",
Rows: []map[string]interface{}{{"id": 1}},
}
driver := &mockFilterPluginDriver{
filterFn: func(data *pluginDriver.PluginDataType, retry bool) (*pluginDriver.PluginDataType, bool, error) {
return data, true, errors.New("filter failed")
},
}
_, _, err := s.applyPluginFilter(driver, input, false)
if err == nil {
t.Fatalf("expected filter error")
}
}