Skip to content

Commit d11098d

Browse files
committed
queue update
1 parent d05e5b2 commit d11098d

File tree

4 files changed

+29
-39
lines changed

4 files changed

+29
-39
lines changed

client/attestation/attestation.go

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ import (
1313
"github.com/flare-foundation/go-flare-common/pkg/database"
1414
"github.com/flare-foundation/go-flare-common/pkg/events"
1515
"github.com/flare-foundation/go-flare-common/pkg/logger"
16-
"github.com/flare-foundation/go-flare-common/pkg/queue"
1716

1817
bitvotes "github.com/flare-foundation/fdc-client/client/attestation/bitVotes"
1918
"github.com/flare-foundation/fdc-client/client/config"
@@ -129,15 +128,23 @@ func AttestationFromDatabaseLog(request database.Log) (Attestation, error) {
129128

130129
// Handle sends the attestation request to the correct verifier server and validates the response.
131130
// The response is saved in the struct.
132-
func (a *Attestation) Handle(ctx context.Context) error {
133-
if a.Status == Success || *a.RoundStatus == Done {
134-
logger.Debugf("discarding request in round %d", a.RoundID)
135-
return fmt.Errorf("%s, handling already confirmed request or round closed", queue.NotRatedDequeue)
131+
func (a *Attestation) Discard(ctx context.Context) bool {
132+
if a.Status == Success {
133+
logger.Debugf("discarding already confirmed request in round %d", a.RoundID)
134+
return true
135+
} else if *a.RoundStatus == Done {
136+
logger.Debugf("discarding request in finished round %d", a.RoundID)
137+
return true
136138
} else if *a.RoundStatus == Consensus && !a.Consensus {
137-
logger.Debugf("discarding request in round %d", a.RoundID)
138-
return fmt.Errorf("%s, delayed request not in consensus", queue.NotRatedDequeue)
139+
logger.Debugf("discarding unselected request in round %d", a.RoundID)
140+
return true
139141
}
142+
return false
143+
}
140144

145+
// Handle sends the attestation request to the correct verifier server and validates the response.
146+
// The response is saved in the struct.
147+
func (a *Attestation) Handle(ctx context.Context) error {
141148
responseBytes, confirmed, err := ResolveAttestationRequest(ctx, a)
142149
if err != nil {
143150
a.Status = ProcessError

client/manager/queues.go

Lines changed: 11 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package manager
22

33
import (
44
"context"
5-
"time"
65

76
"github.com/flare-foundation/go-flare-common/pkg/logger"
87
"github.com/flare-foundation/go-flare-common/pkg/queue"
@@ -35,6 +34,11 @@ func handler(ctx context.Context, at *attestation.Attestation) error {
3534
return at.Handle(ctx)
3635
}
3736

37+
// discard discards requests that do not need to be handled
38+
func discard(ctx context.Context, at *attestation.Attestation) bool {
39+
return at.Discard(ctx)
40+
}
41+
3842
// runQueues runs all attestation queues at once.
3943
func runQueues(ctx context.Context, queues attestationQueues) {
4044
for k := range queues {
@@ -46,29 +50,15 @@ func runQueues(ctx context.Context, queues attestationQueues) {
4650

4751
// run tracks and handles all dequeued attestations from a queue.
4852
func run(ctx context.Context, queue *attestationQueue) {
49-
stop := make(chan error)
50-
5153
for {
52-
select {
53-
case err := <-stop:
54+
err := queue.DequeueAsync(ctx, handler, discard)
55+
if err != nil {
56+
logger.Warn(err)
57+
}
58+
59+
if err := ctx.Err(); err != nil {
5460
logger.Infof("queue worker exiting: %v", err)
5561
return
56-
57-
default:
58-
if queue.Length() > 0 {
59-
go func() {
60-
err := queue.Dequeue(ctx, handler)
61-
if err != nil {
62-
logger.Warn(err)
63-
}
64-
65-
if err := ctx.Err(); err != nil {
66-
stop <- err
67-
}
68-
}()
69-
} else {
70-
time.Sleep(10 * time.Millisecond)
71-
}
7262
}
7363
}
7464
}

go.mod

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ require (
66
github.com/BurntSushi/toml v1.4.0
77
github.com/bradleyjkemp/cupaloy v2.3.0+incompatible
88
github.com/ethereum/go-ethereum v1.14.11
9+
github.com/flare-foundation/go-flare-common v1.1.0-rc.0
910
github.com/gorilla/mux v1.8.1
1011
github.com/pkg/errors v0.9.1
1112
github.com/rs/cors v1.11.1
@@ -33,7 +34,6 @@ require (
3334
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0 // indirect
3435
github.com/ethereum/c-kzg-4844 v1.0.3 // indirect
3536
github.com/ethereum/go-verkle v0.1.1-0.20240829091221-dffa7562dbe9 // indirect
36-
github.com/flare-foundation/go-flare-common v1.0.3-0.20250121164826-d04e4a8b2979 // indirect
3737
github.com/fsnotify/fsnotify v1.7.0 // indirect
3838
github.com/gabriel-vasile/mimetype v1.4.6 // indirect
3939
github.com/gballet/go-libpcsclite v0.0.0-20191108122812-4678299bea08 // indirect
@@ -83,6 +83,7 @@ require (
8383
golang.org/x/sync v0.8.0 // indirect
8484
golang.org/x/sys v0.26.0 // indirect
8585
golang.org/x/text v0.19.0 // indirect
86+
golang.org/x/time v0.5.0 // indirect
8687
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
8788
gopkg.in/yaml.v2 v2.4.0 // indirect
8889
gopkg.in/yaml.v3 v3.0.1 // indirect

go.sum

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -72,16 +72,8 @@ github.com/ethereum/go-ethereum v1.14.11 h1:8nFDCUUE67rPc6AKxFj7JKaOa2W/W1Rse3oS
7272
github.com/ethereum/go-ethereum v1.14.11/go.mod h1:+l/fr42Mma+xBnhefL/+z11/hcmJ2egl+ScIVPjhc7E=
7373
github.com/ethereum/go-verkle v0.1.1-0.20240829091221-dffa7562dbe9 h1:8NfxH2iXvJ60YRB8ChToFTUzl8awsc3cJ8CbLjGIl/A=
7474
github.com/ethereum/go-verkle v0.1.1-0.20240829091221-dffa7562dbe9/go.mod h1:M3b90YRnzqKyyzBEWJGqj8Qff4IDeXnzFw0P9bFw3uk=
75-
github.com/flare-foundation/go-flare-common v1.0.1 h1:BaOCm8YhZ4mGOXM0Qxj/rpdHirrVwbxdhXBTL/2mqQU=
76-
github.com/flare-foundation/go-flare-common v1.0.1/go.mod h1:Pb1OAtNe8jNsD2fh6syJD6kVdqxAAXWESTGgd7uQlME=
77-
github.com/flare-foundation/go-flare-common v1.0.2 h1:GZTQMmIBMVhEMMe2smuGu46jWNoe6+xzeE68XAhmxSE=
78-
github.com/flare-foundation/go-flare-common v1.0.2/go.mod h1:Pb1OAtNe8jNsD2fh6syJD6kVdqxAAXWESTGgd7uQlME=
79-
github.com/flare-foundation/go-flare-common v1.0.3-0.20250121134100-511abd70f402 h1:RdcAT9eptqC7na/Rrx7AX2Hbgsh5d34VyQEEAWUN2hI=
80-
github.com/flare-foundation/go-flare-common v1.0.3-0.20250121134100-511abd70f402/go.mod h1:Pb1OAtNe8jNsD2fh6syJD6kVdqxAAXWESTGgd7uQlME=
81-
github.com/flare-foundation/go-flare-common v1.0.3-0.20250121135110-393570c3d381 h1:ht1lL0MQ+NqPygYF23tEZu6DIIhX0ymPoWyszncRmYc=
82-
github.com/flare-foundation/go-flare-common v1.0.3-0.20250121135110-393570c3d381/go.mod h1:Pb1OAtNe8jNsD2fh6syJD6kVdqxAAXWESTGgd7uQlME=
83-
github.com/flare-foundation/go-flare-common v1.0.3-0.20250121164826-d04e4a8b2979 h1:0F6pHZRPgUhyTysR3FAHnYSw0WiJwiLW0vIEM04wVys=
84-
github.com/flare-foundation/go-flare-common v1.0.3-0.20250121164826-d04e4a8b2979/go.mod h1:Pb1OAtNe8jNsD2fh6syJD6kVdqxAAXWESTGgd7uQlME=
75+
github.com/flare-foundation/go-flare-common v1.1.0-rc.0 h1:Fkf9/RCKHa7Ynxi2G6wm9TAq9AvF3nY7jhYbmrN6BSc=
76+
github.com/flare-foundation/go-flare-common v1.1.0-rc.0/go.mod h1:znM2IpczjntjZGO4cHJKqrtdwCF1+2wWLXqAo1Ag1ek=
8577
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
8678
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
8779
github.com/fsnotify/fsnotify v1.5.4/go.mod h1:OVB6XrOHzAwXMpEM7uPOzcehqUV2UqJxmVXmkdnm1bU=

0 commit comments

Comments
 (0)