Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.

Commit 56245b8

Browse files
committed
some more
1 parent 3661361 commit 56245b8

File tree

6 files changed

+143
-67
lines changed

6 files changed

+143
-67
lines changed

quesma/persistence/elastic_with_eviction.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import (
1414
"time"
1515
)
1616

17-
const MAX_DOC_COUNT = 10000 // prototype TODO: fix/make configurable/idk/etc
17+
const MAX_DOC_COUNT = 10000 // TODO: fix/make configurable/idk/etc
1818
const defaultSizeInBytesLimit = int64(1_000_000_000) // 1GB
1919

2020
// so far I serialize entire struct and keep only 1 string in ES
@@ -25,32 +25,32 @@ type ElasticDatabaseWithEviction struct {
2525
sizeInBytesLimit int64
2626
}
2727

28-
func NewElasticDatabaseWithEviction(ctx context.Context, cfg config.ElasticsearchConfiguration, indexName string, sizeInBytesLimit int64) *ElasticDatabaseWithEviction {
28+
func NewElasticDatabaseWithEviction(cfg config.ElasticsearchConfiguration, indexName string, sizeInBytesLimit int64) *ElasticDatabaseWithEviction {
2929
return &ElasticDatabaseWithEviction{
30-
ctx: ctx,
30+
ctx: context.Background(),
3131
ElasticJSONDatabase: NewElasticJSONDatabase(cfg, indexName),
3232
EvictorInterface: &Evictor{},
3333
sizeInBytesLimit: sizeInBytesLimit,
3434
}
3535
}
3636

3737
// mutexy? or what
38-
func (db *ElasticDatabaseWithEviction) Put(doc *document) bool {
38+
func (db *ElasticDatabaseWithEviction) Put(ctx context.Context, doc *document) bool {
3939
dbSize, success := db.SizeInBytes()
4040
if !success {
4141
return false
4242
}
4343
fmt.Println("kk dbg Put() dbSize:", dbSize)
4444
bytesNeeded := dbSize + doc.SizeInBytes
4545
if bytesNeeded > db.SizeInBytesLimit() {
46-
logger.InfoWithCtx(db.ctx).Msgf("Database is full, need %d bytes more. Evicting documents", bytesNeeded-db.SizeInBytesLimit())
46+
logger.InfoWithCtx(ctx).Msgf("Database is full, need %d bytes more. Evicting documents", bytesNeeded-db.SizeInBytesLimit())
4747
allDocs, ok := db.getAll()
4848
if !ok {
49-
logger.WarnWithCtx(db.ctx).Msg("Error getting all documents")
49+
logger.WarnWithCtx(ctx).Msg("Error getting all documents")
5050
return false
5151
}
5252
indexesToEvict, bytesEvicted := db.SelectToEvict(allDocs, bytesNeeded-db.SizeInBytesLimit())
53-
logger.InfoWithCtx(db.ctx).Msgf("Evicting %v indexes, %d bytes", indexesToEvict, bytesEvicted)
53+
logger.InfoWithCtx(ctx).Msgf("Evicting %v indexes, %d bytes", indexesToEvict, bytesEvicted)
5454
db.evict(indexesToEvict)
5555
bytesNeeded -= bytesEvicted
5656
}
@@ -69,13 +69,13 @@ func (db *ElasticDatabaseWithEviction) Put(doc *document) bool {
6969

7070
jsonData, err := json.Marshal(updateContent)
7171
if err != nil {
72-
logger.WarnWithCtx(db.ctx).Msgf("Error marshalling document: %v", err)
72+
logger.WarnWithCtx(ctx).Msgf("Error marshalling document: %v", err)
7373
return false
7474
}
7575

7676
resp, err := db.httpClient.Request(context.Background(), "POST", elasticsearchURL, jsonData)
7777
if err != nil {
78-
logger.WarnWithCtx(db.ctx).Msgf("Error sending request to elastic: %v", err)
78+
logger.WarnWithCtx(ctx).Msgf("Error sending request to elastic: %v", err)
7979
return false
8080
}
8181
defer resp.Body.Close()
@@ -86,17 +86,17 @@ func (db *ElasticDatabaseWithEviction) Put(doc *document) bool {
8686
default:
8787
respBody, err := io.ReadAll(resp.Body)
8888
if err != nil {
89-
logger.WarnWithCtx(db.ctx).Msgf("Error reading response body: %v, respBody: %v", err, respBody)
89+
logger.WarnWithCtx(ctx).Msgf("Error reading response body: %v, respBody: %v", err, respBody)
9090
}
9191
return false
9292
}
9393
}
9494

9595
// co zwraca? zrobić switch na oba typy jakie teraz mamy?
96-
func (db *ElasticDatabaseWithEviction) Get(id string) (string, bool) { // probably change return type to *Sizeable
96+
func (db *ElasticDatabaseWithEviction) Get(ctx context.Context, id string) (string, bool) { // probably change return type to *Sizeable
9797
value, success, err := db.ElasticJSONDatabase.Get(id)
9898
if err != nil {
99-
logger.WarnWithCtx(db.ctx).Msgf("Error getting document, id: %s, error: %v", id, err)
99+
logger.WarnWithCtx(ctx).Msgf("Error getting document, id: %s, error: %v", id, err)
100100
return "", false
101101
}
102102
return value, success

quesma/persistence/model.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ type JSONDatabase interface {
1818
Put(key string, data string) error
1919
}
2020

21-
// T - type of the data to store, e.g. async_search_storage.AsyncRequestResult
2221
type JSONDatabaseWithEviction interface { // for sure JSON? maybe not only json? check
2322
Put(doc document) bool
2423
Get(id string) (document, bool)
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package async_search_storage
2+
3+
import (
4+
"context"
5+
"quesma/logger"
6+
"quesma/quesma/recovery"
7+
"time"
8+
)
9+
10+
type AsyncQueriesEvictor struct {
11+
ctx context.Context
12+
cancel context.CancelFunc
13+
AsyncRequestStorage AsyncSearchStorageInMemory
14+
AsyncQueriesContexts AsyncQueryContextStorageInMemory
15+
}
16+
17+
func NewAsyncQueriesEvictor(AsyncRequestStorage AsyncSearchStorageInMemory, AsyncQueriesContexts AsyncQueryContextStorageInMemory) *AsyncQueriesEvictor {
18+
ctx, cancel := context.WithCancel(context.Background())
19+
return &AsyncQueriesEvictor{ctx: ctx, cancel: cancel, AsyncRequestStorage: AsyncRequestStorage, AsyncQueriesContexts: AsyncQueriesContexts}
20+
}
21+
22+
func (e *AsyncQueriesEvictor) tryEvictAsyncRequests(timeFun func(time.Time) time.Duration) {
23+
e.AsyncRequestStorage.evict(timeFun)
24+
e.AsyncQueriesContexts.evict(timeFun)
25+
}
26+
27+
func (e *AsyncQueriesEvictor) AsyncQueriesGC() {
28+
defer recovery.LogPanic()
29+
for {
30+
select {
31+
case <-e.ctx.Done():
32+
logger.Debug().Msg("evictor stopped")
33+
return
34+
case <-time.After(GCInterval):
35+
e.tryEvictAsyncRequests(elapsedTime)
36+
}
37+
}
38+
}
39+
40+
func (e *AsyncQueriesEvictor) Close() {
41+
e.cancel()
42+
logger.Info().Msg("AsyncQueriesEvictor Stopped")
43+
}

quesma/quesma/async_search_storage/in_elastic.go

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,63 @@
22
// SPDX-License-Identifier: Elastic-2.0
33
package async_search_storage
44

5-
// TODO :(
5+
import (
6+
"context"
7+
"quesma/persistence"
8+
"quesma/quesma/config"
9+
)
10+
11+
type AsyncSearchStorageInElastic struct {
12+
db *persistence.ElasticDatabaseWithEviction
13+
}
14+
15+
func NewAsyncSearchStorageInElastic() AsyncSearchStorageInElastic {
16+
return AsyncSearchStorageInElastic{
17+
db: persistence.NewElasticDatabaseWithEviction(
18+
config.ElasticsearchConfiguration{}, "async_search", 1_000_000_000),
19+
}
20+
}
21+
22+
func (s AsyncSearchStorageInElastic) Store(ctx context.Context, id string, result *AsyncRequestResult) {
23+
s.db.Put(ctx, nil)
24+
}
25+
26+
func (s AsyncSearchStorageInElastic) Load(ctx context.Context, id string) (*AsyncRequestResult, bool) {
27+
_, ok := s.db.Get(ctx, id)
28+
return nil, ok
29+
}
30+
31+
func (s AsyncSearchStorageInElastic) Delete(id string) {
32+
s.db.Delete(id)
33+
}
34+
35+
func (s AsyncSearchStorageInElastic) DocCount() int {
36+
cnt, ok := s.db.DocCount()
37+
if !ok {
38+
return -1
39+
}
40+
return cnt
41+
}
42+
43+
func (s AsyncSearchStorageInElastic) SizeInBytes() int64 {
44+
size, ok := s.db.SizeInBytes()
45+
if !ok {
46+
return -1
47+
}
48+
return size
49+
}
50+
51+
type AsyncQueryContextStorageInElastic struct {
52+
db *persistence.ElasticDatabaseWithEviction
53+
}
54+
55+
func NewAsyncQueryContextStorageInElastic() AsyncQueryContextStorageInElastic {
56+
return AsyncQueryContextStorageInElastic{
57+
db: persistence.NewElasticDatabaseWithEviction(
58+
config.ElasticsearchConfiguration{}, "async_search", 1_000_000_000),
59+
}
60+
}
61+
62+
func (s AsyncQueryContextStorageInElastic) Store(ctx context.Context, id string, context *AsyncQueryContext) {
63+
s.db.Put(ctx, nil)
64+
}

quesma/quesma/async_search_storage/in_memory.go

Lines changed: 22 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,11 @@ import (
1212
"time"
1313
)
1414

15-
const EvictionInterval = 15 * time.Minute
16-
const GCInterval = 1 * time.Minute
17-
1815
type AsyncSearchStorageInMemory struct {
1916
idToResult *concurrent.Map[string, *AsyncRequestResult]
2017
}
2118

22-
func NewAsyncSearchStorageInMemory() AsyncSearchStorageInMemory {
19+
func NewAsyncSearchStorageInMemory() AsyncSearchStorageInMemory { // change result type to AsyncRequestResultStorage interface
2320
return AsyncSearchStorageInMemory{
2421
idToResult: concurrent.NewMap[string, *AsyncRequestResult](),
2522
}
@@ -54,6 +51,19 @@ func (s AsyncSearchStorageInMemory) SizeInBytes() int {
5451
return size
5552
}
5653

54+
func (s AsyncSearchStorageInMemory) evict(timeFun func(time.Time) time.Duration) {
55+
var ids []asyncQueryIdWithTime
56+
s.Range(func(key string, value *AsyncRequestResult) bool {
57+
if timeFun(value.added) > EvictionInterval {
58+
ids = append(ids, asyncQueryIdWithTime{id: key, time: value.added})
59+
}
60+
return true
61+
})
62+
for _, id := range ids {
63+
s.Delete(id.id)
64+
}
65+
}
66+
5767
type AsyncQueryContextStorageInMemory struct {
5868
idToContext *concurrent.Map[string, *AsyncQueryContext]
5969
}
@@ -68,40 +78,9 @@ func (s AsyncQueryContextStorageInMemory) Store(id string, context *AsyncQueryCo
6878
s.idToContext.Store(id, context)
6979
}
7080

71-
type AsyncQueriesEvictor struct {
72-
ctx context.Context
73-
cancel context.CancelFunc
74-
AsyncRequestStorage AsyncSearchStorageInMemory
75-
AsyncQueriesContexts AsyncQueryContextStorageInMemory
76-
}
77-
78-
func NewAsyncQueriesEvictor(AsyncRequestStorage AsyncSearchStorageInMemory, AsyncQueriesContexts AsyncQueryContextStorageInMemory) *AsyncQueriesEvictor {
79-
ctx, cancel := context.WithCancel(context.Background())
80-
return &AsyncQueriesEvictor{ctx: ctx, cancel: cancel, AsyncRequestStorage: AsyncRequestStorage, AsyncQueriesContexts: AsyncQueriesContexts}
81-
}
82-
83-
func elapsedTime(t time.Time) time.Duration {
84-
return time.Since(t)
85-
}
86-
87-
type asyncQueryIdWithTime struct {
88-
id string
89-
time time.Time
90-
}
91-
92-
func (e *AsyncQueriesEvictor) tryEvictAsyncRequests(timeFun func(time.Time) time.Duration) {
93-
var ids []asyncQueryIdWithTime
94-
e.AsyncRequestStorage.Range(func(key string, value *AsyncRequestResult) bool {
95-
if timeFun(value.added) > EvictionInterval {
96-
ids = append(ids, asyncQueryIdWithTime{id: key, time: value.added})
97-
}
98-
return true
99-
})
100-
for _, id := range ids {
101-
e.AsyncRequestStorage.idToResult.Delete(id.id)
102-
}
81+
func (s AsyncQueryContextStorageInMemory) evict(timeFun func(time.Time) time.Duration) {
10382
var asyncQueriesContexts []*AsyncQueryContext
104-
e.AsyncQueriesContexts.idToContext.Range(func(key string, value *AsyncQueryContext) bool {
83+
s.idToContext.Range(func(key string, value *AsyncQueryContext) bool {
10584
if timeFun(value.added) > EvictionInterval {
10685
if value != nil {
10786
asyncQueriesContexts = append(asyncQueriesContexts, value)
@@ -111,7 +90,7 @@ func (e *AsyncQueriesEvictor) tryEvictAsyncRequests(timeFun func(time.Time) time
11190
})
11291
evictedIds := make([]string, 0)
11392
for _, asyncQueryContext := range asyncQueriesContexts {
114-
e.AsyncQueriesContexts.idToContext.Delete(asyncQueryContext.id)
93+
s.idToContext.Delete(asyncQueryContext.id)
11594
if asyncQueryContext.cancel != nil {
11695
evictedIds = append(evictedIds, asyncQueryContext.id)
11796
asyncQueryContext.cancel()
@@ -122,22 +101,13 @@ func (e *AsyncQueriesEvictor) tryEvictAsyncRequests(timeFun func(time.Time) time
122101
}
123102
}
124103

125-
func (e *AsyncQueriesEvictor) AsyncQueriesGC() {
126-
defer recovery.LogPanic()
127-
for {
128-
select {
129-
case <-e.ctx.Done():
130-
logger.Debug().Msg("evictor stopped")
131-
return
132-
case <-time.After(GCInterval):
133-
e.tryEvictAsyncRequests(elapsedTime)
134-
}
135-
}
104+
func elapsedTime(t time.Time) time.Duration {
105+
return time.Since(t)
136106
}
137107

138-
func (e *AsyncQueriesEvictor) Close() {
139-
e.cancel()
140-
logger.Info().Msg("AsyncQueriesEvictor Stopped")
108+
type asyncQueryIdWithTime struct {
109+
id string
110+
time time.Time
141111
}
142112

143113
type AsyncQueryTraceLoggerEvictor struct {

quesma/quesma/async_search_storage/model.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,18 +7,23 @@ import (
77
"time"
88
)
99

10+
const EvictionInterval = 15 * time.Minute
11+
const GCInterval = 1 * time.Minute
12+
1013
type AsyncRequestResultStorage interface {
1114
Store(id string, result *AsyncRequestResult)
1215
Load(id string) (*AsyncRequestResult, bool)
1316
Delete(id string)
1417
DocCount() int
1518
SizeInBytes() uint64
1619
SizeInBytesLimit() uint64
20+
21+
evict(timeFun func(time.Time) time.Duration)
1722
}
1823

19-
// TODO: maybe merge those 2?
2024
type AsyncQueryContextStorage interface {
2125
Store(id string, context *AsyncQueryContext)
26+
evict(timeFun func(time.Time) time.Duration)
2227
}
2328

2429
type AsyncRequestResult struct {

0 commit comments

Comments
 (0)