Skip to content

Commit f1df935

Browse files
committed
feature added to MockServer #5
1 parent 04bb1ab commit f1df935

File tree

12 files changed

+179
-115
lines changed

12 files changed

+179
-115
lines changed

.vscode/settings.json

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
"go.testOnSave": true,
33
"go.coverOnSave": true,
44
"go.coverageOptions": "showUncoveredCodeOnly",
5-
"go.inferGopath": false,
65
"go.testTimeout": "5s",
76
"editor.tabSize": 8,
87
"editor.detectIndentation": false,

client-rr.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ func RoundRobinClient(ctx context.Context, debug *log.Logger) Client {
2727
return c
2828
}
2929

30-
func (c *rrServer) receivers() (<-chan Message, context.Context) {
30+
func (c *rrServer) Receivers() (<-chan Message, context.Context) {
3131
return nil, nil
3232
}
3333

@@ -64,13 +64,13 @@ func (c *rrServer) Submit(req Task) Task {
6464
return cli.Submit(req)
6565
}
6666

67-
func (c *rrServer) assignTask(tid TaskID) {
67+
func (c *rrServer) AssignTask(tid TaskID) {
6868
}
6969

70-
func (c *rrServer) getTask(tid TaskID) Task {
70+
func (c *rrServer) GetTask(tid TaskID) Task {
7171
return nil
7272
}
7373

74-
func (c *rrServer) extractTask(tid TaskID) Task {
74+
func (c *rrServer) ExtractTask(tid TaskID) Task {
7575
return nil
7676
}

client-single.go

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ type (
1414
mQueue chan Message
1515
reqQueue []Task
1616
climutex *sync.Mutex
17+
reply PacketEmiter
1718
}
1819
)
1920

@@ -30,7 +31,7 @@ func SingleServerClient(ctx context.Context, debug *log.Logger) Client {
3031
return c
3132
}
3233

33-
func (c *singleServer) receivers() (<-chan Message, context.Context) {
34+
func (c *singleServer) Receivers() (<-chan Message, context.Context) {
3435
return c.mQueue, c.ctx
3536
}
3637

@@ -50,9 +51,9 @@ func (c *singleServer) AddServers(servers ...Conn) {
5051

5152
c.configured = true
5253

53-
for _, server := range servers {
54-
c.addServer(server)
55-
}
54+
reply, _ := c.addServer(servers[0])
55+
c.reply = reply
56+
5657
return
5758
}
5859

@@ -62,22 +63,20 @@ func (c *singleServer) Submit(req Task) Task {
6263

6364
c.reqQueue = append(c.reqQueue, req)
6465

65-
for _, s := range c.listServers() {
66-
c.sendTo(s, req.Packet())
67-
}
66+
c.reply.Send(req.Packet())
6867

6968
return req
7069
}
7170

72-
func (c *singleServer) assignTask(tid TaskID) {
71+
func (c *singleServer) AssignTask(tid TaskID) {
7372
c.climutex.Lock()
7473
defer c.climutex.Unlock()
7574

7675
c.jobs[tid.String()] = c.reqQueue[0]
7776
c.reqQueue = c.reqQueue[1:]
7877
}
7978

80-
func (c *singleServer) getTask(tid TaskID) Task {
79+
func (c *singleServer) GetTask(tid TaskID) Task {
8180
c.climutex.Lock()
8281
defer c.climutex.Unlock()
8382

@@ -87,7 +86,7 @@ func (c *singleServer) getTask(tid TaskID) Task {
8786
return NilTask
8887
}
8988

90-
func (c *singleServer) extractTask(tid TaskID) Task {
89+
func (c *singleServer) ExtractTask(tid TaskID) Task {
9190
c.climutex.Lock()
9291
defer c.climutex.Unlock()
9392

client.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,18 +14,18 @@ type (
1414
Submit(Task) Task
1515
Close() error
1616

17-
assignTask(tid TaskID)
18-
getTask(TaskID) Task
19-
extractTask(TaskID) Task
20-
receivers() (<-chan Message, context.Context)
17+
AssignTask(TaskID)
18+
GetTask(TaskID) Task
19+
ExtractTask(TaskID) Task
20+
Receivers() (<-chan Message, context.Context)
2121
}
2222
)
2323

2424
func clientLoop(c Client, dbg *log.Logger) {
2525
var tid TaskID
2626
var err error
2727

28-
mQueue, ctx := c.receivers()
28+
mQueue, ctx := c.Receivers()
2929

3030
for {
3131
select {
@@ -53,23 +53,23 @@ func clientLoop(c Client, dbg *log.Logger) {
5353
debug(dbg, "CLI\tprotocol.JobCreated TID [%s] err : %v\n", string(msg.Pkt.At(0).Bytes()), err)
5454
panic(err)
5555
}
56-
c.assignTask(tid)
56+
c.AssignTask(tid)
5757

5858
case protocol.WorkData, protocol.WorkWarning, protocol.WorkStatus:
5959
if err = tid.Cast(msg.Pkt.At(0)); err != nil {
6060
debug(dbg, "CLI\t%s TID [%s] err : %v\n", msg.Pkt.Cmd(), string(msg.Pkt.At(0).Bytes()), err)
6161
panic(err)
6262
}
6363

64-
c.getTask(tid).Handle(msg.Pkt)
64+
c.GetTask(tid).Handle(msg.Pkt)
6565

6666
case protocol.WorkComplete, protocol.WorkFail, protocol.WorkException:
6767
if err = tid.Cast(msg.Pkt.At(0)); err != nil {
6868
debug(dbg, "CLI\t%s TID [%s] err : %v\n", msg.Pkt.Cmd(), string(msg.Pkt.At(0).Bytes()), err)
6969
panic(err)
7070
}
7171

72-
c.extractTask(tid).Handle(msg.Pkt)
72+
c.ExtractTask(tid).Handle(msg.Pkt)
7373

7474
case protocol.StatusRes:
7575
panic("status_res not wrote")

error.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ var (
2626
)
2727

2828
func (e *ExceptionError) Error() string {
29-
return fmt.Sprintf("Error [%x]", e.Payload)
29+
return fmt.Sprintf("Error [%q]", e.Payload)
3030
}
3131

3232
func (e *IncoherentError) Error() string {

message.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package gearman // import "github.com/nathanaelle/gearman"
2+
3+
import (
4+
"sync"
5+
6+
"github.com/nathanaelle/gearman/v2/protocol"
7+
)
8+
9+
type (
10+
// PacketEmiter is a interface to send packet to a socket pool
11+
PacketEmiter interface {
12+
Send(...protocol.Packet)
13+
}
14+
15+
// Message is the structure for communication between a pool and a Client or a Worker
16+
Message struct {
17+
Reply PacketEmiter
18+
Server Conn
19+
Pkt protocol.Packet
20+
}
21+
22+
chanPacketEmiter struct {
23+
c chan<- protocol.Packet
24+
}
25+
26+
funcPacketEmiter struct {
27+
lock *sync.Mutex
28+
packetEmiter func(protocol.Packet, PacketEmiter)
29+
}
30+
)
31+
32+
var _ PacketEmiter = chanPacketEmiter{}
33+
var _ PacketEmiter = &funcPacketEmiter{}
34+
35+
func newChanPacketEmiter(c chan<- protocol.Packet) PacketEmiter {
36+
return chanPacketEmiter{c}
37+
}
38+
39+
func (cpe chanPacketEmiter) Send(pkts ...protocol.Packet) {
40+
for _, pkt := range pkts {
41+
cpe.c <- pkt
42+
}
43+
}
44+
45+
func newFuncPacketEmiter(pe func(protocol.Packet, PacketEmiter)) PacketEmiter {
46+
return &funcPacketEmiter{
47+
packetEmiter: pe,
48+
}
49+
}
50+
51+
func (fpe *funcPacketEmiter) Send(pkts ...protocol.Packet) {
52+
for _, pkt := range pkts {
53+
fpe.packetEmiter(pkt, fpe)
54+
}
55+
}

mock.go

Lines changed: 46 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package gearman
1+
package gearman // import "github.com/nathanaelle/gearman"
22

33
import (
44
"bytes"
@@ -28,10 +28,12 @@ func NewMockServer() *MockServer {
2828
}
2929
}
3030

31+
// AddServers implements Client.AddServers and Worker.AddServers
3132
func (mc *MockServer) AddServers(...Conn) {
3233

3334
}
3435

36+
// AddHandler implements Worker.AddHandler
3537
func (mc *MockServer) AddHandler(name string, job Job) Worker {
3638
mc.lock.Lock()
3739
defer mc.lock.Unlock()
@@ -41,6 +43,7 @@ func (mc *MockServer) AddHandler(name string, job Job) Worker {
4143
return mc
4244
}
4345

46+
// DelHandler implements Worker.DelHandler
4447
func (mc *MockServer) DelHandler(name string) Worker {
4548
mc.lock.Lock()
4649
defer mc.lock.Unlock()
@@ -50,6 +53,7 @@ func (mc *MockServer) DelHandler(name string) Worker {
5053
return mc
5154
}
5255

56+
// DelAllHandlers implements Worker.DelAllHandlers
5357
func (mc *MockServer) DelAllHandlers() Worker {
5458
mc.lock.Lock()
5559
defer mc.lock.Unlock()
@@ -59,6 +63,7 @@ func (mc *MockServer) DelAllHandlers() Worker {
5963
return mc
6064
}
6165

66+
// GetHandler implements Worker.GetHandler
6267
func (mc *MockServer) GetHandler(name string) Job {
6368
mc.lock.Lock()
6469
defer mc.lock.Unlock()
@@ -70,57 +75,76 @@ func (mc *MockServer) GetHandler(name string) Job {
7075
return FailJob
7176
}
7277

78+
// Receivers implements Client.Receivers and Worker.Receivers
7379
func (mc *MockServer) Receivers() (<-chan Message, context.Context) {
7480
return nil, nil
7581
}
7682

83+
// Close implements Client.Close and Worker.Close
7784
func (mc *MockServer) Close() error {
7885
return nil
7986
}
8087

88+
// Submit implements Client.Submit
8189
func (mc *MockServer) Submit(req Task) Task {
8290
pkt := req.Packet()
8391

8492
switch pkt.Cmd() {
8593
case protocol.SubmitJob:
86-
reply := make(chan protocol.Packet, 5)
94+
reply := mockPacketEmiter(req)
8795

8896
go runWorker(mc.GetHandler(string(pkt.At(0).Bytes())), bytes.NewReader(pkt.At(2).Bytes()), reply, TaskID{})
8997

90-
go func() {
91-
for res := range reply {
92-
switch res.Cmd() {
93-
case protocol.WorkCompleteWorker:
94-
taskRes, _ := protocol.WorkComplete.Borrow(res)
95-
go req.Handle(taskRes)
96-
close(reply)
97-
break
98-
99-
default:
100-
log.Fatalf("res unknown: %v %q", res.Cmd(), res.Payload())
101-
}
102-
}
103-
}()
104-
10598
default:
10699
log.Fatalf("unknown: %v %q", pkt.Cmd(), pkt.Payload())
107100
}
108101

109102
return req
110103
}
111104

112-
func (mc *MockServer) assignTask(tid TaskID) {
105+
// AssignTask implements Client.AssignTask
106+
func (mc *MockServer) AssignTask(tid TaskID) {
113107

114108
}
115109

116-
func (mc *MockServer) getTask(TaskID) Task {
110+
// GetTask implements Client.GetTask
111+
func (mc *MockServer) GetTask(TaskID) Task {
117112
return nil
118113
}
119114

120-
func (mc *MockServer) extractTask(TaskID) Task {
115+
// ExtractTask implements Client.ExtractTask
116+
func (mc *MockServer) ExtractTask(TaskID) Task {
121117
return nil
122118
}
123119

124-
func (mc *MockServer) receivers() (<-chan Message, context.Context) {
125-
return nil, nil
120+
func mockPacketEmiter(req Task) PacketEmiter {
121+
lock := sync.Mutex{}
122+
123+
return newFuncPacketEmiter(func(res protocol.Packet, fpe PacketEmiter) {
124+
lock.Lock()
125+
defer lock.Unlock()
126+
127+
switch res.Cmd() {
128+
case protocol.WorkCompleteWorker:
129+
taskRes, _ := protocol.WorkComplete.Borrow(res)
130+
go req.Handle(taskRes)
131+
return
132+
133+
case protocol.WorkFailWorker:
134+
taskRes, _ := protocol.WorkFail.Borrow(res)
135+
go req.Handle(taskRes)
136+
return
137+
138+
case protocol.WorkExceptionWorker:
139+
taskRes, _ := protocol.WorkException.Borrow(res)
140+
go req.Handle(taskRes)
141+
return
142+
143+
case protocol.WorkDataWorker:
144+
taskRes, _ := protocol.WorkData.Borrow(res)
145+
go req.Handle(taskRes)
146+
return
147+
}
148+
log.Fatalf("res unknown: %v %q", res.Cmd(), res.Payload())
149+
})
126150
}

mock_test.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package gearman
22

33
import (
44
"context"
5+
"errors"
56
"io"
67
"sync"
78
"testing"
@@ -31,6 +32,13 @@ func mockServerClient(cancel context.CancelFunc, wg *sync.WaitGroup, cli Client,
3132
if !validResult(t, []byte("tset"), nil)(r.Value()) {
3233
return
3334
}
35+
36+
r = cli.Submit(NewTask("esrever", []byte("test")))
37+
38+
if !validResult(t, nil, errors.New(`Error ["job doesn't exist"]`))(r.Value()) {
39+
return
40+
}
41+
3442
}
3543

3644
func mockServerWorker(ctx context.Context, wg *sync.WaitGroup, wkr Worker, t *testing.T) {

packet.go

Lines changed: 0 additions & 1 deletion
This file was deleted.

0 commit comments

Comments
 (0)