Skip to content

Commit e29b29d

Browse files
authored
Merge pull request #145 from qubic/bugfix/empty-ticks
Bugfix/empty ticks
2 parents 6bbe643 + 64dc6ab commit e29b29d

9 files changed

Lines changed: 434 additions & 93 deletions

File tree

.github/workflows/push-docker-snapshot-v1.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ name: Deploy v1 snapshot image to GHCR
33
on:
44
push:
55
branches:
6-
- 'feature/update-v1-module-name'
6+
- 'bugfix/empty-ticks'
77

88
jobs:
99
docker-publish:

legacy/app/archive-query-service/main.go

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -34,17 +34,18 @@ func main() {
3434
func run() error {
3535
var cfg struct {
3636
Server struct {
37-
ReadTimeout time.Duration `conf:"default:5s"`
38-
WriteTimeout time.Duration `conf:"default:5s"`
39-
ShutdownTimeout time.Duration `conf:"default:5s"`
40-
HttpHost string `conf:"default:0.0.0.0:8000"` // nolint:revive
41-
GrpcHost string `conf:"default:0.0.0.0:8001"`
42-
ProfilingHost string `conf:"default:0.0.0.0:8002"`
43-
StatusServiceGrpcHost string `conf:"default:127.0.0.1:9901"`
44-
StatusDataCacheTtl time.Duration `conf:"default:1s"` // nolint:revive
45-
EmptyTicksTtl time.Duration `conf:"default:24h"` // nolint:revive
46-
MaxRecvSizeInMb int `conf:"default:1"`
47-
MaxSendSizeInMb int `conf:"default:10"`
37+
ReadTimeout time.Duration `conf:"default:5s"`
38+
WriteTimeout time.Duration `conf:"default:5s"`
39+
ShutdownTimeout time.Duration `conf:"default:5s"`
40+
HttpHost string `conf:"default:0.0.0.0:8000"` // nolint:revive
41+
GrpcHost string `conf:"default:0.0.0.0:8001"`
42+
ProfilingHost string `conf:"default:0.0.0.0:8002"`
43+
StatusServiceGrpcHost string `conf:"default:127.0.0.1:9901"`
44+
StatusDataCacheTtl time.Duration `conf:"default:1s"` // nolint:revive
45+
EmptyTicksTtl time.Duration `conf:"default:24h"` // nolint:revive
46+
EmptyTicksUpdateInterval time.Duration `conf:"default:5s"`
47+
MaxRecvSizeInMb int `conf:"default:1"`
48+
MaxSendSizeInMb int `conf:"default:10"`
4849
}
4950
ElasticSearch struct {
5051
Address []string `conf:"default:https://localhost:9200"`
@@ -137,7 +138,13 @@ func run() error {
137138
MaxSendMsgSize: cfg.Server.MaxSendSizeInMb * 1024 * 1024,
138139
}
139140

140-
queryService := rpc.NewQueryService(cfg.ElasticSearch.TransactionsIndex, cfg.ElasticSearch.TickDataIndex, cfg.ElasticSearch.ComputorListIndex, elasticClient, cache)
141+
queryService := rpc.NewQueryService(
142+
cfg.ElasticSearch.TransactionsIndex,
143+
cfg.ElasticSearch.TickDataIndex,
144+
cfg.ElasticSearch.ComputorListIndex,
145+
elasticClient,
146+
cache,
147+
cfg.Server.EmptyTicksUpdateInterval)
141148
rpcServer := rpc.NewServer(queryService, statusServiceClient)
142149
tickInBoundsInterceptor := rpc.NewTickWithinBoundsInterceptor(statusServiceClient, cache)
143150
var identitiesValidatorInterceptor rpc.IdentitiesValidatorInterceptor

legacy/elastic/query_tick_data.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ func (c *Client) QueryEmptyTicks(ctx context.Context, startTick, endTick, epoch
4949

5050
searchResult, err := c.performGetTicksQuery(ctx, startTick, endTick, epoch)
5151
if err != nil {
52-
return nil, err
52+
return nil, fmt.Errorf("performing (empty) ticks query: %w", err)
5353
}
5454

5555
total := uint32(searchResult.Hits.Total.Value) // doesn't change in following queries
@@ -73,11 +73,10 @@ func (c *Client) QueryEmptyTicks(ctx context.Context, startTick, endTick, epoch
7373
}
7474

7575
for processed < total {
76-
7776
scrollID := searchResult.ScrollID
7877
searchResult, err = c.performGetTicksScroll(ctx, scrollID)
7978
if err != nil {
80-
return nil, err
79+
return nil, fmt.Errorf("performing scroll es call: %w", err)
8180
}
8281

8382
for _, hit := range searchResult.Hits.Hits {
@@ -93,11 +92,18 @@ func (c *Client) QueryEmptyTicks(ctx context.Context, startTick, endTick, epoch
9392
nextTick = tickNumber + 1
9493
processed++
9594
}
95+
}
9696

97+
// this should never happen if total calculation is correct
98+
if searchResult.Hits.Total.Relation != "eq" {
99+
log.Printf("[ERROR] Finished empty ticks query prematurely: total [%d], processed [%d], last hits [%d], relation [%s].",
100+
total, processed, searchResult.Hits.Total.Value, searchResult.Hits.Total.Relation)
97101
}
98102

99103
// fill up with empty ticks
100104
for i := nextTick; i <= uint64(endTick); i++ {
105+
log.Printf("[DEBUG] Fill up empty tick: [%d] (start %d, end %d, total %d, processed %d, number empty %d, empty ticks size %d)",
106+
i, startTick, endTick, total, processed, numberOfEmpty, len(emptyTicks))
101107
emptyTicks = append(emptyTicks, uint32(i))
102108
}
103109

@@ -109,6 +115,7 @@ func (c *Client) performGetTicksQuery(ctx context.Context, startTick, endTick, e
109115
query := `{
110116
"size": %d,
111117
"_source": false,
118+
"track_total_hits": true,
112119
"query": {
113120
"bool": {
114121
"must": [

legacy/elastic/query_tick_data_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ func createHits(total int, ids []string) struct {
229229
Total: struct {
230230
Value int `json:"value"`
231231
Relation string `json:"relation"`
232-
}{Value: total},
232+
}{Value: total, Relation: "eq"},
233233
Hits: hits,
234234
}
235235
}

legacy/rpc/query.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"fmt"
77
"sync"
88
"sync/atomic"
9+
"time"
910

1011
"github.com/qubic/archive-query-service/legacy/elastic"
1112
statusPb "github.com/qubic/go-data-publisher/status-service/protobuf"
@@ -30,15 +31,17 @@ type QueryService struct {
3031
tickDataIndex string
3132
computorListIndex string
3233
emptyTicksLock sync.Mutex
34+
emptyTicksUpdateInterval time.Duration
3335
}
3436

35-
func NewQueryService(txIndex, tickDataIndex, computorListIndex string, elasticClient elastic.SearchClient, cache QueryCache) *QueryService {
37+
func NewQueryService(txIndex, tickDataIndex, computorListIndex string, elasticClient elastic.SearchClient, cache QueryCache, emptyTicksUpdateInterval time.Duration) *QueryService {
3638
return &QueryService{
37-
elasticClient: elasticClient,
38-
txIndex: txIndex,
39-
tickDataIndex: tickDataIndex,
40-
computorListIndex: computorListIndex,
41-
cache: cache,
39+
elasticClient: elasticClient,
40+
txIndex: txIndex,
41+
tickDataIndex: tickDataIndex,
42+
computorListIndex: computorListIndex,
43+
cache: cache,
44+
emptyTicksUpdateInterval: emptyTicksUpdateInterval,
4245
}
4346
}
4447

legacy/rpc/query_empty_ticks.go

Lines changed: 133 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -4,88 +4,126 @@ import (
44
"context"
55
"fmt"
66
"log"
7+
"time"
78

89
statusPb "github.com/qubic/go-data-publisher/status-service/protobuf"
910
)
1011

12+
const emptyTickQueryOffset = 3
13+
1114
func (qs *QueryService) GetEmptyTicks(ctx context.Context, epoch uint32, intervals []*statusPb.TickInterval) (*EmptyTicks, error) {
12-
qs.emptyTicksLock.Lock() // costly and not threadsafe in case of update
15+
qs.emptyTicksLock.Lock() // costly
1316
defer qs.emptyTicksLock.Unlock()
1417

15-
emptyTicks := qs.cache.GetEmptyTicks(epoch)
18+
emptyTicks := qs.cache.GetEmptyTicks(epoch) // only used here
19+
err := sanityCheckData(emptyTicks, intervals, epoch) // checks intervals and empty ticks
20+
if err != nil {
21+
return nil, err
22+
}
1623

17-
if emptyTicks != nil { // some sanity checks
18-
if len(intervals) == 0 || intervals[0].Epoch != epoch || emptyTicks.Epoch != epoch || emptyTicks.StartTick != intervals[0].FirstTick {
19-
log.Printf("[ERROR] Illegal argument. Empty ticks epoch [%d] / start [%d] / end [%d] / len [%d].",
20-
emptyTicks.Epoch, emptyTicks.StartTick, emptyTicks.EndTick, len(emptyTicks.Ticks))
21-
log.Printf("[ERROR] Illegal argument. Intervals: %v", intervals)
22-
return nil, fmt.Errorf("illegal argument for epoch [%d]", epoch)
24+
if emptyTicks == nil { // reload
25+
emptyTicks, err = qs.createNewEmptyTicks(ctx, epoch, intervals)
26+
if err != nil {
27+
return nil, fmt.Errorf("failed to create empty ticks object: %w", err)
2328
}
24-
tick := uint32(0)
25-
for _, interval := range intervals {
26-
if interval.FirstTick < tick {
27-
return nil, fmt.Errorf("unsorted intervals: %v", intervals)
28-
}
29-
tick = interval.FirstTick
29+
} else if isNextQueryFeasible(emptyTicks.LastUpdate, qs.emptyTicksUpdateInterval) {
30+
// only refresh if not recently updated (to avoid multiple similar updates after each other)
31+
err = qs.updateEmptyTicks(ctx, emptyTicks, intervals)
32+
if err != nil {
33+
return nil, fmt.Errorf("failed to update empty ticks object: %w", err)
3034
}
3135
}
3236

33-
if emptyTicks == nil { // reload
37+
return emptyTicks.Clone(), nil // return deep copy and not the cached instance
38+
}
3439

35-
var emptyTickList []uint32
36-
var startTick uint32
37-
var endTick uint32
38-
for _, interval := range intervals {
39-
if interval.Epoch == epoch {
40-
if startTick == 0 {
41-
startTick = interval.FirstTick
42-
}
43-
if endTick < interval.LastTick {
44-
endTick = interval.LastTick
45-
}
46-
ticks, err := qs.queryEmptyTicksFromElastic(ctx, interval.FirstTick, interval.LastTick, epoch)
47-
if err != nil {
48-
return nil, err
49-
}
50-
emptyTickList = append(emptyTickList, ticks...)
51-
}
40+
func (qs *QueryService) createNewEmptyTicks(ctx context.Context, epoch uint32, intervals []*statusPb.TickInterval) (*EmptyTicks, error) {
41+
var emptyTickList []uint32
42+
var startTick uint32
43+
var endTick uint32
44+
for _, interval := range intervals {
45+
if startTick == 0 {
46+
startTick = interval.FirstTick
5247
}
53-
tickMap := make(map[uint32]bool, len(emptyTickList))
54-
for _, tick := range emptyTickList {
55-
tickMap[tick] = true
48+
if endTick < interval.LastTick {
49+
endTick = interval.LastTick
5650
}
57-
emptyTicks = &EmptyTicks{
58-
Epoch: epoch,
59-
StartTick: startTick,
60-
EndTick: endTick,
61-
Ticks: tickMap,
51+
// initially we ignore the offset this could get improved but typically is no problem
52+
ticks, err := qs.queryEmptyTicksFromElastic(ctx, interval.FirstTick, interval.LastTick, epoch)
53+
if err != nil {
54+
return nil, fmt.Errorf("failed to query empty ticks: %w", err)
6255
}
56+
emptyTickList = append(emptyTickList, ticks...)
57+
}
58+
tickMap := make(map[uint32]bool, len(emptyTickList))
59+
for _, tick := range emptyTickList {
60+
tickMap[tick] = true
61+
}
62+
emptyTicks := &EmptyTicks{
63+
Epoch: epoch,
64+
StartTick: startTick,
65+
EndTick: endTick,
66+
Ticks: tickMap,
67+
LastUpdate: time.Now(),
68+
}
69+
if endTick > 0 { // ignore empty intervals on epoch change
6370
qs.cache.SetEmptyTicks(emptyTicks)
71+
}
72+
return emptyTicks, nil
73+
}
6474

65-
} else { // add missing ticks if necessary. Needs lock as we operate on the cached value!
66-
67-
for _, interval := range intervals {
68-
if interval.Epoch == epoch {
69-
if emptyTicks.EndTick < interval.LastTick {
70-
from := max(emptyTicks.EndTick+1, interval.FirstTick) // do no reload ticks we already have
71-
ticks, err := qs.queryEmptyTicksFromElastic(ctx, from, interval.LastTick, epoch)
72-
if err != nil {
73-
return nil, err
74-
}
75-
for _, tick := range ticks {
76-
emptyTicks.Ticks[tick] = true
77-
}
78-
emptyTicks.EndTick = interval.LastTick
75+
func (qs *QueryService) updateEmptyTicks(ctx context.Context, emptyTicks *EmptyTicks, intervals []*statusPb.TickInterval) error {
76+
emptyTicks.LastUpdate = time.Now()
77+
for i, interval := range intervals {
78+
if emptyTicks.EndTick < interval.LastTick {
79+
// add missing ticks if necessary. Needs lock as we operate on the cached value!
80+
from, to := calculateRange(emptyTicks, interval, isLast(i, len(intervals)))
81+
if from <= to {
82+
ticks, err := qs.queryEmptyTicksFromElastic(ctx, from, to, emptyTicks.Epoch)
83+
if err != nil {
84+
return fmt.Errorf("failed to query empty ticks: %w", err)
85+
}
86+
for _, tick := range ticks {
87+
emptyTicks.Ticks[tick] = true
7988
}
89+
emptyTicks.EndTick = to
8090
}
8191
}
82-
qs.cache.SetEmptyTicks(emptyTicks) // not sure if this is necessary (update ttl, ...)
92+
}
93+
return nil
94+
}
8395

96+
func calculateRange(emptyTicks *EmptyTicks, interval *statusPb.TickInterval, isLastInterval bool) (uint32, uint32) {
97+
from := max(emptyTicks.EndTick+1, interval.FirstTick) // do no reload ticks we already have
98+
to := interval.LastTick
99+
if to >= emptyTickQueryOffset && isLastInterval && isFirstCall(emptyTicks, interval) {
100+
// all intervals except the last one do not change anymore and can be queried completely
101+
// the first time we only query up to the empty tick offset
102+
to = interval.LastTick - emptyTickQueryOffset // can be < from theoretically
84103
}
85-
return emptyTicks, nil
104+
return from, to
86105
}
87106

88-
func (qs *QueryService) queryEmptyTicksFromElastic(ctx context.Context, from, to uint32, epoch uint32) ([]uint32, error) {
107+
func isNextQueryFeasible(lastUpdate time.Time, interval time.Duration) bool {
108+
return time.Since(lastUpdate) >= interval
109+
}
110+
111+
func isFirstCall(emptyTicks *EmptyTicks, interval *statusPb.TickInterval) bool {
112+
// lastTick should only increase.
113+
// if we have endTick == lastTick - offset, then it is the second call
114+
// if we have endTick < lastTick - offset, then the last tick increased
115+
firstCall := emptyTicks.EndTick < interval.LastTick-emptyTickQueryOffset
116+
if !firstCall {
117+
log.Printf("[DEBUG] Second empty ticks call for end tick [%d].", interval.LastTick)
118+
}
119+
return firstCall
120+
}
121+
122+
func isLast(index int, length int) bool {
123+
return index == length-1
124+
}
125+
126+
func (qs *QueryService) queryEmptyTicksFromElastic(ctx context.Context, from, to, epoch uint32) ([]uint32, error) {
89127
if to-from > 100 {
90128
log.Printf("[DEBUG] Query empty ticks: from [%d], to [%d], epoch [%d]", from, to, epoch)
91129
}
@@ -100,3 +138,41 @@ func (qs *QueryService) queryEmptyTicksFromElastic(ctx context.Context, from, to
100138
qs.ConsecutiveElasticErrorCount.Store(0)
101139
return ticks, nil
102140
}
141+
142+
func sanityCheckData(emptyTicks *EmptyTicks, intervals []*statusPb.TickInterval, epoch uint32) error {
143+
err := verifySortedInEpoch(intervals, epoch)
144+
if err != nil {
145+
return err
146+
}
147+
148+
if emptyTicks != nil {
149+
if emptyTicks.Epoch != epoch { // should not be possible (remove?)
150+
log.Printf("[ERROR] unexpected empty ticks data (expected data for epoch [%d], but got [%d]).", epoch, emptyTicks.Epoch)
151+
return fmt.Errorf("illegal cache state for epoch [%d]", epoch)
152+
}
153+
154+
// if len intervals == 0 (can happen on epoch change), then proceed with current empty ticks
155+
if len(intervals) != 0 && (intervals[0].Epoch != epoch || emptyTicks.StartTick != intervals[0].FirstTick) {
156+
log.Printf("[ERROR] Illegal argument. Empty ticks epoch [%d] / start [%d] / end [%d] / len [%d].",
157+
emptyTicks.Epoch, emptyTicks.StartTick, emptyTicks.EndTick, len(emptyTicks.Ticks))
158+
log.Printf("[ERROR] Illegal argument. Intervals: %v", intervals)
159+
return fmt.Errorf("illegal state for epoch [%d]", epoch)
160+
}
161+
}
162+
163+
return nil
164+
}
165+
166+
func verifySortedInEpoch(intervals []*statusPb.TickInterval, epoch uint32) error {
167+
tick := uint32(0)
168+
for _, interval := range intervals {
169+
if interval.Epoch != epoch {
170+
return fmt.Errorf("got interval for wrong epoch [%d] (expected: [%d])", interval.Epoch, epoch)
171+
}
172+
if interval.FirstTick < tick {
173+
return fmt.Errorf("unsorted intervals: %v", intervals)
174+
}
175+
tick = interval.FirstTick
176+
}
177+
return nil
178+
}

0 commit comments

Comments
 (0)