Skip to content

Commit 28711b5

Browse files
author
Jackie Li
committed
add call with headers
1 parent b8072fc commit 28711b5

File tree

4 files changed

+76
-9
lines changed

4 files changed

+76
-9
lines changed

api.go

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -93,10 +93,10 @@ func (p *Process) WaitFor(d time.Duration) error {
9393

9494
// RegisterFunc registers the function using it's reflection name
9595
func (p Process) RegisterFunc(function interface{}) (funcName string, err error) {
96-
if reflect.TypeOf(function).Kind() != reflect.Func {
97-
return "", errors.New("f is not a function")
96+
funcName, err = fn(function)
97+
if err != nil {
98+
return "", err
9899
}
99-
funcName = runtime.FuncForPC(reflect.ValueOf(function).Pointer()).Name()
100100
registered := p.server.IsTaskRegistered(funcName)
101101
if !registered {
102102
err = p.server.RegisterTask(funcName, function)
@@ -116,22 +116,38 @@ func (p Process) Register(funcName string, function interface{}) error {
116116
return nil
117117
}
118118

119-
// Invoke registers the func with it's reflect name, and sends the task
119+
// Invoke calls the func by its reflect name
120120
func (p Process) Invoke(f interface{}, args []tasks.Arg) (jobID string, err error) {
121-
funcName, err := p.RegisterFunc(f)
121+
return p.InvokeWithHeaders(f, args, nil)
122+
}
123+
124+
// InvokeWithHeaders calls the function by its reflect name with headers
125+
func (p Process) InvokeWithHeaders(f interface{}, args []tasks.Arg, headers tasks.Headers) (jobID string, err error) {
126+
funcName, err := fn(f)
122127
if err != nil {
123128
return "", err
124129
}
125-
return p.Call(funcName, args)
130+
return p.CallWithHeaders(funcName, args, headers)
126131
}
127132

128133
// Call calls a registered function, the arguments needs to be in the machinery []Arg format
129134
func (p Process) Call(funcName string, args []tasks.Arg) (jobID string, err error) {
135+
return p.CallWithHeaders(funcName, args, nil)
136+
}
137+
138+
// CallWithHeaders calls a register function with metadata
139+
func (p Process) CallWithHeaders(funcName string, args []tasks.Arg, headers tasks.Headers) (jobID string, err error) {
140+
if !p.server.IsTaskRegistered(funcName) {
141+
return "", errors.Errorf("function %s is not registered", funcName)
142+
}
143+
130144
sig, err := tasks.NewSignature(funcName, args)
131145
if err != nil {
132146
return "", errors.Wrap(err, "process call")
133147
}
134148

149+
sig.Headers = headers
150+
135151
r, err := p.server.SendTask(sig)
136152
if err != nil {
137153
return "", errors.Wrapf(err, "call func %s", funcName)
@@ -584,3 +600,10 @@ func interruptSubject(jobID string) string {
584600
func headerSubject(jobID string) string {
585601
return "headers_" + jobID
586602
}
603+
604+
func fn(function interface{}) (string, error) {
605+
if reflect.TypeOf(function).Kind() != reflect.Func {
606+
return "", errors.New("f is not a function")
607+
}
608+
return runtime.FuncForPC(reflect.ValueOf(function).Pointer()).Name(), nil
609+
}

api_test.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -330,13 +330,57 @@ func TestGracefulWait(t *testing.T) {
330330
assert.EqualError(t, err, "Worker quit gracefully")
331331
}
332332

333+
func TestInvokeUnregistered(t *testing.T) {
334+
p, err := New("redis://localhost:6379")
335+
require.NoError(t, err)
336+
337+
_, err = p.Invoke(task, []tasks.Arg{
338+
{
339+
Type: "string",
340+
Value: "test invoke",
341+
},
342+
})
343+
require.Error(t, err)
344+
}
345+
346+
func TestCallWithHeaders(t *testing.T) {
347+
p, err := New("redis://localhost:6379")
348+
require.NoError(t, err)
349+
task := func(ctx context.Context, msg string) (string, error) {
350+
sig := tasks.SignatureFromContext(ctx)
351+
assert.EqualValues(t, "bar", sig.Headers["foo"])
352+
return "received " + msg, nil
353+
}
354+
355+
p.RegisterFunc(task)
356+
357+
jobID, err := p.InvokeWithHeaders(
358+
task,
359+
[]tasks.Arg{
360+
{
361+
Type: "string",
362+
Value: "test invoke",
363+
},
364+
},
365+
map[string]interface{}{
366+
"foo": "bar",
367+
},
368+
)
369+
370+
r := p.GetResult(jobID)
371+
_, err = r.Get(1 * time.Millisecond)
372+
require.NoError(t, err)
373+
}
374+
333375
func TestInvoke(t *testing.T) {
334376
p, err := New("redis://localhost:6379")
335377
require.NoError(t, err)
336378
task := func(msg string) (string, error) {
337379
return "received " + msg, nil
338380
}
339381

382+
p.RegisterFunc(task)
383+
340384
jobID, err := p.Invoke(task, []tasks.Arg{
341385
{
342386
Type: "string",

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ module github.com/jackielii/process
33
go 1.12
44

55
require (
6-
github.com/RichardKnop/machinery v1.6.2
6+
github.com/RichardKnop/machinery v1.6.5
77
github.com/gomodule/redigo v2.0.0+incompatible
88
github.com/google/uuid v1.1.0
99
github.com/pkg/errors v0.8.1

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@ git.apache.org/thrift.git v0.0.0-20181218151757-9b75e4fe745a/go.mod h1:fPE2ZNJGy
1212
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
1313
github.com/RichardKnop/logging v0.0.0-20181101035820-b1d5d44c82d6 h1:Vgjpn7q8aQnye8nVJUboZbPd8DFLjYafgjJN2nO73xc=
1414
github.com/RichardKnop/logging v0.0.0-20181101035820-b1d5d44c82d6/go.mod h1:rJJ84PyA/Wlmw1hO+xTzV2wsSUon6J5ktg0g8BF2PuU=
15-
github.com/RichardKnop/machinery v1.6.2 h1:Mn53hDOPj9RF6Lt6HU8DVZCt5PTM4OoZydh3LO4L/ao=
16-
github.com/RichardKnop/machinery v1.6.2/go.mod h1:+QjVq/Z0aWiTc1O0lq34oK9PY6NzYjxVNlLlgTaqoJE=
15+
github.com/RichardKnop/machinery v1.6.5 h1:naU8+o/B1bdQeugr8MLXzoE3qCbeonBGlwOB0b2aL2Y=
16+
github.com/RichardKnop/machinery v1.6.5/go.mod h1:+QjVq/Z0aWiTc1O0lq34oK9PY6NzYjxVNlLlgTaqoJE=
1717
github.com/RichardKnop/redsync v1.2.0 h1:gK35hR3zZkQigHKm8wOGb9MpJ9BsrW6MzxezwjTcHP0=
1818
github.com/RichardKnop/redsync v1.2.0/go.mod h1:9b8nBGAX3bE2uCfJGSnsDvF23mKyHTZzmvmj5FH3Tp0=
1919
github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYUyUczH0OGQWaF5ceTx0UBShxjsH6f8oGKYe2c=

0 commit comments

Comments
 (0)