Skip to content
This repository was archived by the owner on Feb 7, 2025. It is now read-only.

Commit b759013

Browse files
committed
fixed issues
1 parent 279ac54 commit b759013

4 files changed

Lines changed: 39 additions & 10 deletions

File tree

controllers/task.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ func (ctx *taskContext) run(c *gin.Context) {
107107
opts := &interfaces.SpiderRunOptions{
108108
Mode: t.Mode,
109109
NodeIds: t.NodeIds,
110+
Cmd: t.Cmd,
110111
Param: t.Param,
111112
Priority: t.Priority,
112113
}

grpc/server/message_server.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,27 +28,31 @@ func (svr MessageServer) Connect(stream grpc.MessageService_ConnectServer) (err
2828
finished := make(chan bool)
2929
for {
3030
msg, err := stream.Recv()
31+
nodeKey := "unknown node key"
32+
if msg != nil {
33+
nodeKey = msg.NodeKey
34+
}
3135
if err == io.EOF {
32-
log.Infof("[MessageServer] received signal EOF from node[%s], now quit", msg.NodeKey)
36+
log.Infof("[MessageServer] received signal EOF from node[%s], now quit", nodeKey)
3337
return nil
3438
}
3539
if err != nil {
36-
log.Errorf("[MessageServer] receiving message error from node[%s]: %v", msg.NodeKey, err)
40+
log.Errorf("[MessageServer] receiving message error from node[%s]: %v", nodeKey, err)
3741
return err
3842
}
3943
switch msg.Code {
4044
case grpc.StreamMessageCode_CONNECT:
41-
log.Infof("[MessageServer] received connect request from node[%s], key: %s", msg.NodeKey, msg.Key)
45+
log.Infof("[MessageServer] received connect request from node[%s], key: %s", nodeKey, msg.Key)
4246
svr.server.SetSubscribe(msg.Key, &entity.GrpcSubscribe{
4347
Stream: stream,
4448
Finished: finished,
4549
})
4650
case grpc.StreamMessageCode_DISCONNECT:
47-
log.Infof("[MessageServer] received disconnect request from node[%s], key: %s", msg.NodeKey, msg.Key)
51+
log.Infof("[MessageServer] received disconnect request from node[%s], key: %s", nodeKey, msg.Key)
4852
svr.server.DeleteSubscribe(msg.Key)
4953
return nil
5054
case grpc.StreamMessageCode_SEND:
51-
log.Debugf("[MessageServer] received send request from node[%s] to %s", msg.NodeKey, msg.To)
55+
log.Debugf("[MessageServer] received send request from node[%s] to %s", nodeKey, msg.To)
5256
sub, err := svr.server.GetSubscribe(msg.To)
5357
if err != nil {
5458
return err

grpc/server/server.go

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package server
22

33
import (
44
"encoding/json"
5+
"fmt"
56
"github.com/apex/log"
67
config2 "github.com/crawlab-team/crawlab-core/config"
78
"github.com/crawlab-team/crawlab-core/constants"
@@ -180,12 +181,13 @@ func (svr *Server) IsStopped() (res bool) {
180181
return svr.stopped
181182
}
182183

184+
func (svr *Server) recoveryHandlerFunc(p interface{}) (err error) {
185+
err = errors.NewError(errors.ErrorPrefixGrpc, fmt.Sprintf("%v", p))
186+
trace.PrintError(err)
187+
return err
188+
}
189+
183190
func NewServer(opts ...Option) (svr2 interfaces.GrpcServer, err error) {
184-
// recovery options
185-
var recoveryFunc grpc_recovery.RecoveryHandlerFunc
186-
recoveryOpts := []grpc_recovery.Option{
187-
grpc_recovery.WithRecoveryHandler(recoveryFunc),
188-
}
189191

190192
// server
191193
svr := &Server{
@@ -244,6 +246,11 @@ func NewServer(opts ...Option) (svr2 interfaces.GrpcServer, err error) {
244246
return nil, err
245247
}
246248

249+
// recovery options
250+
recoveryOpts := []grpc_recovery.Option{
251+
grpc_recovery.WithRecoveryHandler(svr.recoveryHandlerFunc),
252+
}
253+
247254
// grpc server
248255
svr.svr = grpc.NewServer(
249256
grpc_middleware.WithUnaryServerChain(

spider/admin/service.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,23 @@ func (svc *Service) scheduleTasks(s *models.Spider, opts *interfaces.SpiderRunOp
7171
UserId: opts.UserId,
7272
}
7373

74+
// normalize
75+
if mainTask.Mode == "" {
76+
mainTask.Mode = s.Mode
77+
}
78+
if mainTask.NodeIds == nil {
79+
mainTask.NodeIds = s.NodeIds
80+
}
81+
if mainTask.Cmd == "" {
82+
mainTask.Cmd = s.Cmd
83+
}
84+
if mainTask.Param == "" {
85+
mainTask.Param = s.Param
86+
}
87+
if mainTask.Priority == 0 {
88+
mainTask.Priority = s.Priority
89+
}
90+
7491
if svc.isMultiTask(opts) {
7592
// multi tasks
7693
// TODO: implement associated tasks

0 commit comments

Comments
 (0)