Skip to content

Commit 90fc2d5

Browse files
authored
feat(portable): plugin error message for func (#3305)
Signed-off-by: Jiyong Huang <[email protected]>
1 parent e19c37a commit 90fc2d5

File tree

3 files changed

+20
-4
lines changed

3 files changed

+20
-4
lines changed

internal/plugin/portable/runtime/connection.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ func (r *NanomsgReqRepChannel) Req(arg []byte) ([]byte, error) {
151151
}
152152
}
153153
if err != nil {
154-
return nil, fmt.Errorf("can't send message on function rep socket: %s", err.Error())
154+
return nil, err
155155
}
156156
result, e := r.sock.Recv()
157157
if e != nil {

internal/plugin/portable/runtime/function.go

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,11 @@ package runtime
1616

1717
import (
1818
"encoding/json"
19+
"errors"
1920
"fmt"
2021

2122
"github.com/lf-edge/ekuiper/contract/v2/api"
23+
nerrors "go.nanomsg.org/mangos/v3/errors"
2224

2325
"github.com/lf-edge/ekuiper/v2/internal/conf"
2426
kctx "github.com/lf-edge/ekuiper/v2/internal/topo/context"
@@ -86,7 +88,8 @@ func (f *PortableFunc) Validate(args []interface{}) error {
8688
}
8789
res, err := f.dataCh.Req(jsonArg)
8890
if err != nil {
89-
return err
91+
e := handleTimeout(err, f.reg.Name)
92+
return e
9093
}
9194
fr := &FuncReply{}
9295
err = json.Unmarshal(res, fr)
@@ -112,7 +115,8 @@ func (f *PortableFunc) Exec(ctx api.FunctionContext, args []any) (interface{}, b
112115
}
113116
res, err := f.dataCh.Req(jsonArg)
114117
if err != nil {
115-
return err, false
118+
e := handleTimeout(err, f.reg.Name)
119+
return e, false
116120
}
117121
fr := &FuncReply{}
118122
err = json.Unmarshal(res, fr)
@@ -129,6 +133,19 @@ func (f *PortableFunc) Exec(ctx api.FunctionContext, args []any) (interface{}, b
129133
return fr.Result, fr.State
130134
}
131135

136+
func handleTimeout(err error, pname string) error {
137+
if errors.Is(err, nerrors.ErrRecvTimeout) {
138+
pm := GetPluginInsManager()
139+
status, ok := pm.GetPluginInsStatus(pname)
140+
if !ok {
141+
return fmt.Errorf("plugin %s was removed", pname)
142+
} else {
143+
return fmt.Errorf("time out, plugin %s status %s, message: %s", pname, status.Status, status.ErrMsg)
144+
}
145+
}
146+
return err
147+
}
148+
132149
func (f *PortableFunc) IsAggregate() bool {
133150
if f.isAgg > 0 {
134151
return f.isAgg > 1

internal/plugin/portable/runtime/plugin_ins_manager.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -301,7 +301,6 @@ func (p *pluginInsManager) GetOrStartProcess(pluginMeta *PluginMeta, pconf *Port
301301
cmd.Stdout = conf.Log.Out
302302
cmd.Stderr = conf.Log.Out
303303
cmd.Dir = filepath.Dir(pluginMeta.Executable)
304-
305304
conf.Log.Println("plugin starting")
306305
err = cmd.Start()
307306
failpoint.Inject("cmdStartErr", func() {

0 commit comments

Comments
 (0)