Skip to content

Commit 9ea5071

Browse files
authored
Merge pull request #1504 from c3llus/feat/issue-1499/new-max-defer-delay
nsqd: add support for configurable max-defer-timeout for DPUB
2 parents 3103474 + d182ee2 commit 9ea5071

File tree

7 files changed

+35
-21
lines changed

7 files changed

+35
-21
lines changed

apps/nsqd/main.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ func (p *program) Init(env svc.Environment) error {
5454
cfg.Validate()
5555

5656
options.Resolve(opts, flagSet, cfg)
57+
applyBackwardCompatibility(opts, flagSet)
5758

5859
nsqd, err := nsqd.New(opts)
5960
if err != nil {
@@ -104,3 +105,13 @@ func (p *program) Context() context.Context {
104105
func logFatal(f string, args ...interface{}) {
105106
lg.LogFatal("[nsqd] ", f, args...)
106107
}
108+
109+
// applyBackwardCompatibility applies backward compatibility rules to options after flag resolution
110+
func applyBackwardCompatibility(opts *nsqd.Options, flagSet *flag.FlagSet) {
111+
// when max-defer-timeout was not explicitly set, refer to the max-req-timeout value
112+
if flag := flagSet.Lookup("max-defer-timeout"); flag != nil && flag.Value.String() == flag.DefValue {
113+
opts.MaxDeferTimeout = opts.MaxReqTimeout
114+
}
115+
116+
// ... other backward compatibility rules can be added here
117+
}

apps/nsqd/options.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@ func nsqdFlagSet(opts *nsqd.Options) *flag.FlagSet {
161161
flagSet.Int64("max-msg-size", opts.MaxMsgSize, "maximum size of a single message in bytes")
162162
flagSet.Duration("max-req-timeout", opts.MaxReqTimeout, "maximum requeuing timeout for a message")
163163
flagSet.Int64("max-body-size", opts.MaxBodySize, "maximum size of a single command body")
164+
flagSet.Duration("max-defer-timeout", opts.MaxDeferTimeout, "maximum duration when deferring a message")
164165

165166
// client overridable configuration options
166167
flagSet.Duration("max-heartbeat-interval", opts.MaxHeartbeatInterval, "maximum client configurable duration of time between client heartbeats")

nsqd/http.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,7 @@ func (s *httpServer) doPUB(w http.ResponseWriter, req *http.Request, ps httprout
245245
return nil, http_api.Err{400, "INVALID_DEFER"}
246246
}
247247
deferred = time.Duration(di) * time.Millisecond
248-
if deferred < 0 || deferred > s.nsqd.getOpts().MaxReqTimeout {
248+
if deferred < 0 || deferred > s.nsqd.getOpts().MaxDeferTimeout {
249249
return nil, http_api.Err{400, "INVALID_DEFER"}
250250
}
251251
}

nsqd/options.go

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -47,12 +47,13 @@ type Options struct {
4747
QueueScanDirtyPercent float64
4848

4949
// msg and command options
50-
MsgTimeout time.Duration `flag:"msg-timeout"`
51-
MaxMsgTimeout time.Duration `flag:"max-msg-timeout"`
52-
MaxMsgSize int64 `flag:"max-msg-size"`
53-
MaxBodySize int64 `flag:"max-body-size"`
54-
MaxReqTimeout time.Duration `flag:"max-req-timeout"`
55-
ClientTimeout time.Duration
50+
MsgTimeout time.Duration `flag:"msg-timeout"`
51+
MaxMsgTimeout time.Duration `flag:"max-msg-timeout"`
52+
MaxMsgSize int64 `flag:"max-msg-size"`
53+
MaxBodySize int64 `flag:"max-body-size"`
54+
MaxReqTimeout time.Duration `flag:"max-req-timeout"`
55+
ClientTimeout time.Duration
56+
MaxDeferTimeout time.Duration `flag:"max-defer-timeout"`
5657

5758
// client overridable configuration options
5859
MaxHeartbeatInterval time.Duration `flag:"max-heartbeat-interval"`
@@ -151,12 +152,13 @@ func NewOptions() *Options {
151152
QueueScanWorkerPoolMax: 4,
152153
QueueScanDirtyPercent: 0.25,
153154

154-
MsgTimeout: 60 * time.Second,
155-
MaxMsgTimeout: 15 * time.Minute,
156-
MaxMsgSize: 1024 * 1024,
157-
MaxBodySize: 5 * 1024 * 1024,
158-
MaxReqTimeout: 1 * time.Hour,
159-
ClientTimeout: 60 * time.Second,
155+
MsgTimeout: 60 * time.Second,
156+
MaxMsgTimeout: 15 * time.Minute,
157+
MaxMsgSize: 1024 * 1024,
158+
MaxBodySize: 5 * 1024 * 1024,
159+
MaxReqTimeout: 1 * time.Hour,
160+
ClientTimeout: 60 * time.Second,
161+
MaxDeferTimeout: 1 * time.Hour,
160162

161163
MaxHeartbeatInterval: 60 * time.Second,
162164
MaxRdyCount: 2500,

nsqd/protocol_v2.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -916,14 +916,14 @@ func (p *protocolV2) DPUB(client *clientV2, params [][]byte) ([]byte, error) {
916916
timeoutMs, err := protocol.ByteToBase10(params[2])
917917
if err != nil {
918918
return nil, protocol.NewFatalClientErr(err, "E_INVALID",
919-
fmt.Sprintf("DPUB could not parse timeout %s", params[2]))
919+
fmt.Sprintf("DPUB could not parse defer timeout %s", params[2]))
920920
}
921921
timeoutDuration := time.Duration(timeoutMs) * time.Millisecond
922922

923-
if timeoutDuration < 0 || timeoutDuration > p.nsqd.getOpts().MaxReqTimeout {
923+
if timeoutDuration < 0 || timeoutDuration > p.nsqd.getOpts().MaxDeferTimeout {
924924
return nil, protocol.NewFatalClientErr(nil, "E_INVALID",
925-
fmt.Sprintf("DPUB timeout %d out of range 0-%d",
926-
timeoutMs, p.nsqd.getOpts().MaxReqTimeout/time.Millisecond))
925+
fmt.Sprintf("DPUB defer timeout %d out of range 0-%d",
926+
timeoutMs, p.nsqd.getOpts().MaxDeferTimeout/time.Millisecond))
927927
}
928928

929929
bodyLen, err := readLen(client.Reader, client.lenSlice)

nsqd/protocol_v2_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -671,12 +671,12 @@ func TestDPUB(t *testing.T) {
671671
test.Equal(t, 1, int(atomic.LoadUint64(&ch.messageCount)))
672672

673673
// duration out of range
674-
nsq.DeferredPublish(topicName, opts.MaxReqTimeout+100*time.Millisecond, make([]byte, 100)).WriteTo(conn)
674+
nsq.DeferredPublish(topicName, opts.MaxDeferTimeout+100*time.Millisecond, make([]byte, 100)).WriteTo(conn)
675675
resp, _ = nsq.ReadResponse(conn)
676676
frameType, data, _ = nsq.UnpackResponse(resp)
677677
t.Logf("frameType: %d, data: %s", frameType, data)
678678
test.Equal(t, frameTypeError, frameType)
679-
test.Equal(t, "E_INVALID DPUB timeout 3600100 out of range 0-3600000", string(data))
679+
test.Equal(t, "E_INVALID DPUB defer timeout 3600100 out of range 0-3600000", string(data))
680680
}
681681

682682
func TestTouch(t *testing.T) {

nsqd/protocol_v2_unixsocket_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -531,12 +531,12 @@ func TestUnixSocketDPUB(t *testing.T) {
531531
test.Equal(t, 1, int(atomic.LoadUint64(&ch.messageCount)))
532532

533533
// duration out of range
534-
nsq.DeferredPublish(topicName, opts.MaxReqTimeout+100*time.Millisecond, make([]byte, 100)).WriteTo(conn)
534+
nsq.DeferredPublish(topicName, opts.MaxDeferTimeout+100*time.Millisecond, make([]byte, 100)).WriteTo(conn)
535535
resp, _ = nsq.ReadResponse(conn)
536536
frameType, data, _ = nsq.UnpackResponse(resp)
537537
t.Logf("frameType: %d, data: %s", frameType, data)
538538
test.Equal(t, frameTypeError, frameType)
539-
test.Equal(t, "E_INVALID DPUB timeout 3600100 out of range 0-3600000", string(data))
539+
test.Equal(t, "E_INVALID DPUB defer timeout 3600100 out of range 0-3600000", string(data))
540540
}
541541

542542
func TestUnixSocketTouch(t *testing.T) {

0 commit comments

Comments
 (0)