Skip to content

Commit e1e5f46

Browse files
Feat: multi group (#56)
* fix: appgroup auto scale * fix bug: discover may panic --------- Co-authored-by: HuangXiaomeng <xiaom@apache.org>
1 parent 9356fe8 commit e1e5f46

11 files changed

Lines changed: 284 additions & 73 deletions

File tree

client.go

Lines changed: 30 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -141,16 +141,35 @@ func newClient(cfg *Config, opts ...Option) (*Client, error) {
141141
return nil, fmt.Errorf("cannot get schedulerX discovery active server")
142142
}
143143

144-
// Init connection pool
145-
dialer := func() (net.Conn, error) {
146-
activeServer := serverDiscover.ActiveServer()
147-
logger.Infof("SchedulerX discovery active server addr=%s", activeServer)
148-
return net.DialTimeout("tcp", activeServer, time.Millisecond*500)
144+
// Init connection pool manager (per-groupId isolated connections)
145+
poolFactory := func(fctx context.Context, groupId string) pool.ConnPool {
146+
sd := discovery.GetDiscovery(groupId)
147+
changedCh := sd.ResultChangedCh()
148+
// Drain the signal from the initial refreshActiveServer call (addr "" → actual),
149+
// since the pool will dial the current active server directly on first Get().
150+
// Only future genuine addr changes should trigger reconnection.
151+
select {
152+
case <-changedCh:
153+
default:
154+
}
155+
dialer := func() (net.Conn, error) {
156+
activeServer := sd.ActiveServer()
157+
logger.Infof("SchedulerX discovery active server addr=%s, groupId=%s", activeServer, groupId)
158+
return net.DialTimeout("tcp", activeServer, time.Millisecond*500)
159+
}
160+
return pool.NewSingleConnPool(fctx, dialer,
161+
pool.WithPostDialer(remoting.Handshake),
162+
pool.WithAddrChangedSignalCh(changedCh))
149163
}
150-
pool.InitConnPool(ctx, dialer,
151-
pool.WithPostDialer(remoting.Handshake),
152-
pool.WithAddrChangedSignalCh(serverDiscover.ResultChangedCh()))
153-
if conn, err := pool.GetConnPool().Get(ctx); err != nil {
164+
pool.InitConnPoolManager(ctx, poolFactory,
165+
pool.WithDefaultGroup(cfg.GroupId),
166+
pool.WithOnNewPool(func(groupId string, p pool.ConnPool) {
167+
remoting.OnMsgReceived(ctx, p)
168+
}))
169+
170+
// Eagerly create the parent group's pool and verify connection
171+
parentPool := pool.GetConnPoolManager().GetOrCreate(cfg.GroupId)
172+
if conn, err := parentPool.Get(ctx); err != nil {
154173
return nil, fmt.Errorf("cannot connect schedulerx server, maybe network was broken, err=%s", err.Error())
155174
} else {
156175
logger.Infof("SchedulerX server connected, remoteAddr=%s, localAddr=%s", conn.RemoteAddr(), conn.LocalAddr().String())
@@ -168,10 +187,9 @@ func newClient(cfg *Config, opts ...Option) (*Client, error) {
168187
stopChan := make(chan os.Signal, 1)
169188
signal.Notify(stopChan, syscall.SIGINT, syscall.SIGTERM)
170189

171-
// Keep heartbeat, and receive message
190+
// Keep heartbeat (OnMsgReceived is started per-pool via the onNewPool callback above)
172191
// KeepHeartbeat must after init actors, so that can get actorSystemPort from actorSystem
173-
go remoting.KeepHeartbeat(ctx, actorSystem, cfg.AppKey, stopChan)
174-
go remoting.OnMsgReceived(ctx)
192+
go remoting.KeepHeartbeat(ctx, actorSystem, stopChan)
175193

176194
return &Client{
177195
cfg: cfg,

internal/actor/at_least_once_delivery_actor.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121

2222
actorcomm "github.com/alibaba/schedulerx-worker-go/internal/actor/common"
2323
"github.com/alibaba/schedulerx-worker-go/internal/proto/schedulerx"
24+
"github.com/alibaba/schedulerx-worker-go/internal/remoting/pool"
2425
"github.com/alibaba/schedulerx-worker-go/logger"
2526
)
2627

@@ -61,12 +62,14 @@ func (a *atLeastOnceDeliveryRoutingActor) Receive(actorCtx actor.Context) {
6162

6263
func (a *atLeastOnceDeliveryRoutingActor) handleReportInstanceStatusEvent(req *schedulerx.WorkerReportJobInstanceStatusRequest) {
6364
actorcomm.SxMsgReceiver() <- &actorcomm.SchedulerWrappedMsg{
65+
Ctx: pool.ConnPoolCtxForGroup(req.GetGroupId()),
6466
Msg: req,
6567
}
6668
}
6769

6870
func (a *atLeastOnceDeliveryRoutingActor) handleBatchReportTaskStatues(req *schedulerx.WorkerBatchReportTaskStatuesRequest) {
6971
actorcomm.SxMsgReceiver() <- &actorcomm.SchedulerWrappedMsg{
72+
Ctx: pool.ConnPoolCtxForGroup(req.GetGroupId()),
7073
Msg: req,
7174
}
7275
}

internal/actor/common/utils.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,14 @@ func IsSchedulerxServer(pid *actor.PID) bool {
6060
}
6161

6262
func SchedulerxServerPid(ctx context.Context) *actor.PID {
63-
conn, err := pool.GetConnPool().Get(ctx)
63+
if ctx == nil {
64+
ctx = context.Background()
65+
}
66+
cp := pool.ConnPoolFromContext(ctx)
67+
if cp == nil {
68+
cp = pool.GetConnPool()
69+
}
70+
conn, err := cp.Get(ctx)
6471
if err != nil {
6572
return &actor.PID{}
6673
}

internal/actor/heartbeat_processor.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package actor
1818

1919
import (
20+
"context"
2021
"fmt"
2122

2223
"github.com/asynkron/protoactor-go/actor"
@@ -44,6 +45,13 @@ func newHeartbeatProcessor(connpool pool.ConnPool) actor.Process {
4445
}
4546
}
4647

48+
func (p *heartbeatProcessor) getConnPool(ctx context.Context) pool.ConnPool {
49+
if cp := pool.ConnPoolFromContext(ctx); cp != nil {
50+
return cp
51+
}
52+
return p.connpool
53+
}
54+
4755
func (p *heartbeatProcessor) SendUserMessage(pid *actor.PID, message interface{}) {
4856
if actorcomm.IsSchedulerxServer(pid) {
4957
var (
@@ -55,7 +63,7 @@ func (p *heartbeatProcessor) SendUserMessage(pid *actor.PID, message interface{}
5563
logger.Errorf("Get unknown message, msg=%+v", wrappedMsg)
5664
return
5765
}
58-
conn, err := p.connpool.Get(wrappedMsg.Ctx)
66+
conn, err := p.getConnPool(wrappedMsg.Ctx).Get(wrappedMsg.Ctx)
5967
if err != nil {
6068
logger.Errorf("Get conn from pool failed, err=%s", err.Error())
6169
return

internal/actor/job_instance_processor.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package actor
1818

1919
import (
20+
"context"
2021
"fmt"
2122

2223
"github.com/asynkron/protoactor-go/actor"
@@ -44,6 +45,13 @@ func newJobInstanceProcessor(connpool pool.ConnPool) actor.Process {
4445
}
4546
}
4647

48+
func (p *jobInstanceProcessor) getConnPool(ctx context.Context) pool.ConnPool {
49+
if cp := pool.ConnPoolFromContext(ctx); cp != nil {
50+
return cp
51+
}
52+
return p.connpool
53+
}
54+
4755
func (p *jobInstanceProcessor) SendUserMessage(pid *actor.PID, message interface{}) {
4856
if actorcomm.IsSchedulerxServer(pid) {
4957
var (
@@ -55,7 +63,7 @@ func (p *jobInstanceProcessor) SendUserMessage(pid *actor.PID, message interface
5563
logger.Errorf("Get unknown message, msg=%+v", wrappedMsg)
5664
return
5765
}
58-
conn, err := p.connpool.Get(wrappedMsg.Ctx)
66+
conn, err := p.getConnPool(wrappedMsg.Ctx).Get(wrappedMsg.Ctx)
5967
if err != nil {
6068
logger.Errorf("Get conn from pool failed, err=%s", err.Error())
6169
return

internal/actor/task_actor.go

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
package actor
1818

1919
import (
20-
"context"
2120
"fmt"
2221
"time"
2322

@@ -91,7 +90,7 @@ func (a *taskActor) Receive(ctx actor.Context) {
9190
ctx.Send(actorcomm.SchedulerxServerPid(msg.Ctx), msg)
9291
case *schedulerx.WorkerBatchUpdateTaskStatusRequest:
9392
// forward to server
94-
serverPid := actorcomm.SchedulerxServerPid(context.Background())
93+
serverPid := actorcomm.SchedulerxServerPid(msg.Ctx)
9594
result, err := ctx.RequestFuture(serverPid, msg, 5*time.Second).Result()
9695
if err != nil {
9796
logger.Errorf("Send WorkerBatchUpdateTaskStatusRequest timeout, jobInstanceId=%d, serverAddr=%s", innerMsg.JobInstanceId, serverPid.Address)
@@ -100,7 +99,7 @@ func (a *taskActor) Receive(ctx actor.Context) {
10099
}
101100
case *schedulerx.WorkerQueryJobInstanceStatusRequest:
102101
// forward to server
103-
serverPid := actorcomm.SchedulerxServerPid(context.Background())
102+
serverPid := actorcomm.SchedulerxServerPid(msg.Ctx)
104103
result, err := ctx.RequestFuture(serverPid, msg, 30*time.Second).Result()
105104
if err != nil {
106105
logger.Errorf("Send WorkerQueryJobInstanceStatusRequest timeout, jobInstanceId=%d, serverAddr=%s", innerMsg.JobInstanceId, serverPid.Address)
@@ -109,7 +108,7 @@ func (a *taskActor) Receive(ctx actor.Context) {
109108
}
110109
case *schedulerx.WorkerClearTasksRequest:
111110
// forward to server
112-
serverPid := actorcomm.SchedulerxServerPid(context.Background())
111+
serverPid := actorcomm.SchedulerxServerPid(msg.Ctx)
113112
result, err := ctx.RequestFuture(serverPid, msg, 5*time.Second).Result()
114113
if err != nil {
115114
logger.Errorf("Send WorkerClearTasksRequest timeout, jobInstanceId=%d, serverAddr=%s", innerMsg.JobInstanceId, serverPid.Address)
@@ -118,7 +117,7 @@ func (a *taskActor) Receive(ctx actor.Context) {
118117
}
119118
case *schedulerx.WorkerBatchCreateTasksRequest:
120119
// forward to server
121-
serverPid := actorcomm.SchedulerxServerPid(context.Background())
120+
serverPid := actorcomm.SchedulerxServerPid(msg.Ctx)
122121
result, err := ctx.RequestFuture(serverPid, msg, 90*time.Second).Result()
123122
if err != nil {
124123
logger.Errorf("Send WorkerBatchCreateTasksRequest timeout, jobInstanceId=%d, serverAddr=%s", innerMsg.JobInstanceId, serverPid.Address)
@@ -127,7 +126,7 @@ func (a *taskActor) Receive(ctx actor.Context) {
127126
}
128127
case *schedulerx.WorkerPullTasksRequest:
129128
// forward to server
130-
serverPid := actorcomm.SchedulerxServerPid(context.Background())
129+
serverPid := actorcomm.SchedulerxServerPid(msg.Ctx)
131130
result, err := ctx.RequestFuture(serverPid, msg, 30*time.Second).Result()
132131
if err != nil {
133132
logger.Errorf("Send WorkerPullTasksRequest timeout, jobInstanceId=%d, serverAddr=%s", innerMsg.JobInstanceId, serverPid.Address)
@@ -136,10 +135,10 @@ func (a *taskActor) Receive(ctx actor.Context) {
136135
}
137136
case *schedulerx.WorkerBatchReportTaskStatuesRequest:
138137
// forward to server
139-
serverPid := actorcomm.SchedulerxServerPid(context.Background())
138+
serverPid := actorcomm.SchedulerxServerPid(msg.Ctx)
140139
ctx.Send(serverPid, msg)
141140
case *schedulerx.WorkerReportTaskListStatusRequest:
142-
serverPid := actorcomm.SchedulerxServerPid(context.Background())
141+
serverPid := actorcomm.SchedulerxServerPid(msg.Ctx)
143142
result, err := ctx.RequestFuture(serverPid, innerMsg, 30*time.Second).Result()
144143
if err != nil {
145144
logger.Errorf("Send WorkerReportTaskListStatusRequest timeout, jobInstanceId=%d, serverAddr=%s", innerMsg.JobInstanceId, serverPid.Address)

internal/actor/task_actor_processor.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package actor
1818

1919
import (
20+
"context"
2021
"fmt"
2122

2223
"github.com/asynkron/protoactor-go/actor"
@@ -43,6 +44,13 @@ func newTaskProcessor(connpool pool.ConnPool) actor.Process {
4344
}
4445
}
4546

47+
func (p *taskProcessor) getConnPool(ctx context.Context) pool.ConnPool {
48+
if cp := pool.ConnPoolFromContext(ctx); cp != nil {
49+
return cp
50+
}
51+
return p.connpool
52+
}
53+
4654
func (p *taskProcessor) SendUserMessage(pid *actor.PID, message interface{}) {
4755
if actorcomm.IsSchedulerxServer(pid) {
4856
var (
@@ -54,7 +62,7 @@ func (p *taskProcessor) SendUserMessage(pid *actor.PID, message interface{}) {
5462
logger.Errorf("Get unknown message, msg=%+v", wrappedMsg)
5563
return
5664
}
57-
conn, err := p.connpool.Get(wrappedMsg.Ctx)
65+
conn, err := p.getConnPool(wrappedMsg.Ctx).Get(wrappedMsg.Ctx)
5866
if err != nil {
5967
logger.Errorf("Get conn from pool failed, err=%s", err.Error())
6068
return

internal/discovery/discover.go

Lines changed: 41 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -118,18 +118,55 @@ func (s *ServiceDiscover) queryActiveServer(groupId, appKey string) (string, err
118118
return "", fmt.Errorf("result is not success requestId:%s message:%s, url=%s", respData.RequestId, respData.Message, urlStr)
119119
}
120120

121+
// Data 可能是 string(旧版 API)或 map[string]interface{}(JSON 对象),需安全解析
122+
getLeaderAddr := func(data interface{}) (string, error) {
123+
switch v := data.(type) {
124+
case string:
125+
return v, nil
126+
case map[string]interface{}:
127+
for _, key := range []string{"currentLeaderAddr", "CurrentLeaderAddr", "leaderAddr", "LeaderAddr", "addr", "address"} {
128+
if val, ok := v[key]; ok {
129+
if s, ok := val.(string); ok {
130+
return s, nil
131+
}
132+
}
133+
}
134+
return "", fmt.Errorf("cannot find leader address in response data")
135+
default:
136+
return "", fmt.Errorf("unexpected Data type: %T", data)
137+
}
138+
}
139+
121140
if respData.Code != GroupHasChild {
122-
return respData.Data.(string), nil
141+
addr, err := getLeaderAddr(respData.Data)
142+
if err != nil {
143+
return "", err
144+
}
145+
return addr, nil
123146
}
124147

125148
// This application group has enabled automatic scaling and has split child nodes, requiring the parsing of all groupIds, and register serverDiscovery.
126149
var groupResult struct {
127150
CurrentLeaderAddr string
128151
GroupIdMap map[string]string // key=groupId, val=appKey
129152
}
130-
err = json.Unmarshal([]byte(respData.Data.(string)), &groupResult)
131-
if err != nil {
132-
return "", fmt.Errorf("unmarshal group result[%s] fail %w", respData.Data, err)
153+
switch v := respData.Data.(type) {
154+
case string:
155+
err = json.Unmarshal([]byte(v), &groupResult)
156+
if err != nil {
157+
return "", fmt.Errorf("unmarshal group result[%s] fail %w", respData.Data, err)
158+
}
159+
case map[string]interface{}:
160+
b, marshalErr := json.Marshal(v)
161+
if marshalErr != nil {
162+
return "", fmt.Errorf("marshal group result fail %w", marshalErr)
163+
}
164+
err = json.Unmarshal(b, &groupResult)
165+
if err != nil {
166+
return "", fmt.Errorf("unmarshal group result fail %w", err)
167+
}
168+
default:
169+
return "", fmt.Errorf("unexpected Data type: %T", respData.Data)
133170
}
134171

135172
for childGroupId, childAppKey := range groupResult.GroupIdMap {

internal/remoting/handler.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,10 @@ import (
3434
"github.com/alibaba/schedulerx-worker-go/logger"
3535
)
3636

37-
func OnMsgReceived(ctx context.Context) {
38-
connpool := pool.GetConnPool()
37+
func OnMsgReceived(ctx context.Context, connpool pool.ConnPool) {
38+
// Embed the originating pool in the context so that downstream actors/processors
39+
// send responses back through the same connection (i.e. to the correct server).
40+
poolCtx := pool.WithConnPool(ctx, connpool)
3941

4042
var dataLen uint32
4143
hdrBuf := make([]byte, constants.TransportHeaderSize)
@@ -100,13 +102,13 @@ func OnMsgReceived(ctx context.Context) {
100102
continue
101103
}
102104
case *schedulerx.ServerSubmitJobInstanceRequest:
103-
actorcomm.SxMsgReceiver() <- actorcomm.WrapSchedulerxMsg(ctx, msg, senderPath)
105+
actorcomm.SxMsgReceiver() <- actorcomm.WrapSchedulerxMsg(poolCtx, msg, senderPath)
104106
case *schedulerx.ServerKillJobInstanceRequest:
105-
actorcomm.SxMsgReceiver() <- actorcomm.WrapSchedulerxMsg(ctx, msg, senderPath)
107+
actorcomm.SxMsgReceiver() <- actorcomm.WrapSchedulerxMsg(poolCtx, msg, senderPath)
106108
case *schedulerx.ServerKillTaskRequest:
107-
actorcomm.SxMsgReceiver() <- actorcomm.WrapSchedulerxMsg(ctx, msg, senderPath)
109+
actorcomm.SxMsgReceiver() <- actorcomm.WrapSchedulerxMsg(poolCtx, msg, senderPath)
108110
case *schedulerx.ServerRetryTasksRequest:
109-
actorcomm.SxMsgReceiver() <- actorcomm.WrapSchedulerxMsg(ctx, msg, senderPath)
111+
actorcomm.SxMsgReceiver() <- actorcomm.WrapSchedulerxMsg(poolCtx, msg, senderPath)
110112
case *schedulerx.WorkerReportJobInstanceStatusResponse:
111113
logger.Debugf("Receive WorkerReportJobInstanceStatusResponse from server, resp=%+v", msg)
112114
continue

0 commit comments

Comments
 (0)