Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.

Commit 576b195

Browse files
committed
Minor, now should be all
1 parent a2c621e commit 576b195

File tree

8 files changed

+141
-104
lines changed

8 files changed

+141
-104
lines changed

quesma/quesma/async_search_storage/in_elastic.go renamed to quesma/quesma/async_search_storage/in_elasticsearch.go

Lines changed: 28 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,11 @@ import (
1414
"time"
1515
)
1616

17-
type AsyncSearchStorageInElastic struct {
17+
type AsyncRequestResultStorageInElasticsearch struct {
1818
db *persistence.ElasticDatabaseWithEviction
1919
}
2020

21-
func NewAsyncSearchStorageInElastic() AsyncSearchStorageInElastic {
21+
func NewAsyncRequestResultStorageInElasticsearch() AsyncRequestResultStorage {
2222
// TODO use passed config
2323
realUrl, err := url.Parse("http://localhost:9200")
2424
if err != nil {
@@ -31,19 +31,19 @@ func NewAsyncSearchStorageInElastic() AsyncSearchStorageInElastic {
3131
Password: "",
3232
}
3333
i := rand.Int()
34-
return AsyncSearchStorageInElastic{
34+
return AsyncRequestResultStorageInElasticsearch{
3535
db: persistence.NewElasticDatabaseWithEviction(cfg, "quesma_async_storage-"+strconv.Itoa(i), 1_000_000_000),
3636
}
3737
}
3838

39-
func (s AsyncSearchStorageInElastic) Store(id string, result *AsyncRequestResult) {
39+
func (s AsyncRequestResultStorageInElasticsearch) Store(id string, result *AsyncRequestResult) {
4040
err := s.db.Put(result.toJSON(id))
4141
if err != nil {
4242
logger.Warn().Err(err).Msg("failed to store document")
4343
}
4444
}
4545

46-
func (s AsyncSearchStorageInElastic) Load(id string) (*AsyncRequestResult, error) {
46+
func (s AsyncRequestResultStorageInElasticsearch) Load(id string) (*AsyncRequestResult, error) {
4747
resultAsBytes, err := s.db.Get(id)
4848
if err != nil {
4949
return nil, err
@@ -58,22 +58,22 @@ func (s AsyncSearchStorageInElastic) Load(id string) (*AsyncRequestResult, error
5858
return &result, nil
5959
}
6060

61-
func (s AsyncSearchStorageInElastic) Delete(id string) {
61+
func (s AsyncRequestResultStorageInElasticsearch) Delete(id string) {
6262
err := s.db.Delete(id)
6363
if err != nil {
6464
logger.Warn().Err(err).Msg("failed to delete document")
6565
}
6666
}
6767

68-
func (s AsyncSearchStorageInElastic) DeleteOld(t time.Duration) {
68+
func (s AsyncRequestResultStorageInElasticsearch) DeleteOld(t time.Duration) {
6969
err := s.db.DeleteOld(t)
7070
if err != nil {
7171
logger.Warn().Err(err).Msg("failed to delete old documents")
7272
}
7373
}
7474

7575
// DocCount returns the number of documents in the database, or -1 if the count could not be retrieved.
76-
func (s AsyncSearchStorageInElastic) DocCount() int {
76+
func (s AsyncRequestResultStorageInElasticsearch) DocCount() int {
7777
cnt, err := s.db.DocCount()
7878
if err != nil {
7979
logger.Warn().Err(err).Msg("failed to get document count")
@@ -83,7 +83,7 @@ func (s AsyncSearchStorageInElastic) DocCount() int {
8383
}
8484

8585
// StorageSizeInBytes returns the total size of all documents in the database, or -1 if the size could not be retrieved.
86-
func (s AsyncSearchStorageInElastic) SpaceInUse() int64 {
86+
func (s AsyncRequestResultStorageInElasticsearch) SpaceInUse() int64 {
8787
size, err := s.db.SizeInBytes()
8888
if err != nil {
8989
logger.Warn().Err(err).Msg("failed to get storage size")
@@ -92,24 +92,38 @@ func (s AsyncSearchStorageInElastic) SpaceInUse() int64 {
9292
return size
9393
}
9494

95-
func (s AsyncSearchStorageInElastic) SpaceMaxAvailable() int64 {
95+
func (s AsyncRequestResultStorageInElasticsearch) SpaceMaxAvailable() int64 {
9696
return s.db.SizeInBytesLimit()
9797
}
9898

99-
func (s AsyncSearchStorageInElastic) evict(evictOlderThan time.Duration) {
99+
func (s AsyncRequestResultStorageInElasticsearch) evict(evictOlderThan time.Duration) {
100100
err := s.db.DeleteOld(evictOlderThan)
101101
if err != nil {
102102
logger.Warn().Err(err).Msgf("failed to evict documents, err: %v", err)
103103
}
104104
}
105105

106-
type AsyncQueryContextStorageInElastic struct {
106+
type AsyncQueryContextStorageInElasticsearch struct {
107107
db *persistence.ElasticDatabaseWithEviction
108108
}
109109

110-
func NewAsyncQueryContextStorageInElastic() AsyncQueryContextStorageInElastic {
111-
return AsyncQueryContextStorageInElastic{
110+
func NewAsyncQueryContextStorageInElasticsearch() AsyncQueryContextStorage {
111+
return AsyncQueryContextStorageInElasticsearch{
112112
db: persistence.NewElasticDatabaseWithEviction(
113113
config.ElasticsearchConfiguration{}, "async_search", 1_000_000_000),
114114
}
115115
}
116+
117+
func (s AsyncQueryContextStorageInElasticsearch) Store(context *AsyncQueryContext) {
118+
err := s.db.Put(context.toJSON())
119+
if err != nil {
120+
logger.Warn().Err(err).Msg("failed to store document")
121+
}
122+
}
123+
124+
func (s AsyncQueryContextStorageInElasticsearch) evict(evictOlderThan time.Duration) {
125+
err := s.db.DeleteOld(evictOlderThan)
126+
if err != nil {
127+
logger.Warn().Err(err).Msg("failed to delete old documents")
128+
}
129+
}

quesma/quesma/async_search_storage/in_memory.go

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -14,41 +14,41 @@ import (
1414
"time"
1515
)
1616

17-
type AsyncSearchStorageInMemory struct {
17+
type AsyncRequestResultStorageInMemory struct {
1818
idToResult *concurrent.Map[string, *AsyncRequestResult]
1919
}
2020

21-
func NewAsyncSearchStorageInMemory() AsyncSearchStorageInMemory { // change result type to AsyncRequestResultStorage interface
22-
return AsyncSearchStorageInMemory{
21+
func NewAsyncRequestResultStorageInMemory() AsyncRequestResultStorage { // change result type to AsyncRequestResultStorage interface
22+
return AsyncRequestResultStorageInMemory{
2323
idToResult: concurrent.NewMap[string, *AsyncRequestResult](),
2424
}
2525
}
2626

27-
func (s AsyncSearchStorageInMemory) Store(id string, result *AsyncRequestResult) {
27+
func (s AsyncRequestResultStorageInMemory) Store(id string, result *AsyncRequestResult) {
2828
s.idToResult.Store(id, result)
2929
}
3030

31-
func (s AsyncSearchStorageInMemory) Range(f func(key string, value *AsyncRequestResult) bool) {
31+
func (s AsyncRequestResultStorageInMemory) Range(f func(key string, value *AsyncRequestResult) bool) {
3232
s.idToResult.Range(f)
3333
}
3434

35-
func (s AsyncSearchStorageInMemory) Load(id string) (*AsyncRequestResult, error) {
35+
func (s AsyncRequestResultStorageInMemory) Load(id string) (*AsyncRequestResult, error) {
3636
if val, ok := s.idToResult.Load(id); ok {
3737
return val, nil
3838
}
3939
return nil, fmt.Errorf("key %s not found", id)
4040
}
4141

42-
func (s AsyncSearchStorageInMemory) Delete(id string) {
42+
func (s AsyncRequestResultStorageInMemory) Delete(id string) {
4343
s.idToResult.Delete(id)
4444
}
4545

46-
func (s AsyncSearchStorageInMemory) DocCount() int {
46+
func (s AsyncRequestResultStorageInMemory) DocCount() int {
4747
return s.idToResult.Size()
4848
}
4949

5050
// in bytes
51-
func (s AsyncSearchStorageInMemory) SpaceInUse() int64 {
51+
func (s AsyncRequestResultStorageInMemory) SpaceInUse() int64 {
5252
size := int64(0)
5353
s.Range(func(key string, value *AsyncRequestResult) bool {
5454
size += int64(len(value.GetResponseBody()))
@@ -57,11 +57,11 @@ func (s AsyncSearchStorageInMemory) SpaceInUse() int64 {
5757
return size
5858
}
5959

60-
func (s AsyncSearchStorageInMemory) SpaceMaxAvailable() int64 {
60+
func (s AsyncRequestResultStorageInMemory) SpaceMaxAvailable() int64 {
6161
return math.MaxInt64 / 16 // some huge number for now, can be changed if we want to limit in-memory storage
6262
}
6363

64-
func (s AsyncSearchStorageInMemory) evict(evictOlderThan time.Duration) {
64+
func (s AsyncRequestResultStorageInMemory) evict(evictOlderThan time.Duration) {
6565
var ids []string
6666
s.Range(func(key string, value *AsyncRequestResult) bool {
6767
if time.Since(value.added) > evictOlderThan {
@@ -78,14 +78,14 @@ type AsyncQueryContextStorageInMemory struct {
7878
idToContext *concurrent.Map[string, *AsyncQueryContext]
7979
}
8080

81-
func NewAsyncQueryContextStorageInMemory() AsyncQueryContextStorageInMemory {
81+
func NewAsyncQueryContextStorageInMemory() AsyncQueryContextStorage {
8282
return AsyncQueryContextStorageInMemory{
8383
idToContext: concurrent.NewMap[string, *AsyncQueryContext](),
8484
}
8585
}
8686

87-
func (s AsyncQueryContextStorageInMemory) Store(id string, context *AsyncQueryContext) {
88-
s.idToContext.Store(id, context)
87+
func (s AsyncQueryContextStorageInMemory) Store(context *AsyncQueryContext) {
88+
s.idToContext.Store(context.id, context)
8989
}
9090

9191
func (s AsyncQueryContextStorageInMemory) evict(evictOlderThan time.Duration) {

quesma/quesma/async_search_storage/in_memory_fallback_elastic.go

Lines changed: 0 additions & 55 deletions
This file was deleted.
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
// Copyright Quesma, licensed under the Elastic License 2.0.
2+
// SPDX-License-Identifier: Elastic-2.0
3+
package async_search_storage
4+
5+
import (
6+
"time"
7+
)
8+
9+
type AsyncRequestResultStorageInMemoryFallbackElastic struct {
10+
inMemory AsyncRequestResultStorageInMemory
11+
inElasticsearch AsyncRequestResultStorageInElasticsearch
12+
}
13+
14+
func NewAsyncSearchStorageInMemoryFallbackElastic() AsyncRequestResultStorageInMemoryFallbackElastic {
15+
return AsyncRequestResultStorageInMemoryFallbackElastic{
16+
inMemory: NewAsyncRequestResultStorageInMemory().(AsyncRequestResultStorageInMemory),
17+
inElasticsearch: NewAsyncRequestResultStorageInElasticsearch().(AsyncRequestResultStorageInElasticsearch),
18+
}
19+
}
20+
21+
func (s AsyncRequestResultStorageInMemoryFallbackElastic) Store(id string, result *AsyncRequestResult) {
22+
s.inMemory.Store(id, result)
23+
go s.inElasticsearch.Store(id, result)
24+
}
25+
26+
func (s AsyncRequestResultStorageInMemoryFallbackElastic) Load(id string) (*AsyncRequestResult, error) {
27+
result, err := s.inMemory.Load(id)
28+
if err == nil {
29+
return result, nil
30+
}
31+
return s.inElasticsearch.Load(id)
32+
}
33+
34+
func (s AsyncRequestResultStorageInMemoryFallbackElastic) Delete(id string) {
35+
s.inMemory.Delete(id)
36+
go s.inElasticsearch.Delete(id)
37+
}
38+
39+
// DocCount returns inMemory doc count
40+
func (s AsyncRequestResultStorageInMemoryFallbackElastic) DocCount() int {
41+
return s.inMemory.DocCount()
42+
}
43+
44+
// SpaceInUse returns inMemory size in bytes
45+
func (s AsyncRequestResultStorageInMemoryFallbackElastic) SpaceInUse() int64 {
46+
return s.inMemory.SpaceInUse()
47+
}
48+
49+
// SpaceMaxAvailable returns inMemory size in bytes limit
50+
func (s AsyncRequestResultStorageInMemoryFallbackElastic) SpaceMaxAvailable() int64 {
51+
return s.inMemory.SpaceMaxAvailable()
52+
}
53+
54+
func (s AsyncRequestResultStorageInMemoryFallbackElastic) evict(olderThan time.Duration) {
55+
s.inMemory.evict(olderThan)
56+
go s.inElasticsearch.DeleteOld(olderThan)
57+
}
58+
59+
type AsyncQueryContextStorageInMemoryFallbackElasticsearch struct {
60+
inMemory AsyncQueryContextStorageInMemory
61+
inElasticsearch AsyncQueryContextStorageInElasticsearch
62+
}
63+
64+
func NewAsyncQueryContextStorageInMemoryFallbackElasticsearch() AsyncQueryContextStorage {
65+
return AsyncQueryContextStorageInMemoryFallbackElasticsearch{
66+
inMemory: NewAsyncQueryContextStorageInMemory().(AsyncQueryContextStorageInMemory),
67+
inElasticsearch: NewAsyncQueryContextStorageInElasticsearch().(AsyncQueryContextStorageInElasticsearch),
68+
}
69+
}
70+
71+
func (s AsyncQueryContextStorageInMemoryFallbackElasticsearch) Store(context *AsyncQueryContext) {
72+
s.inMemory.Store(context)
73+
go s.inElasticsearch.Store(context)
74+
}
75+
76+
func (s AsyncQueryContextStorageInMemoryFallbackElasticsearch) evict(evictOlderThan time.Duration) {
77+
s.inMemory.evict(evictOlderThan)
78+
go s.inElasticsearch.evict(evictOlderThan)
79+
}

quesma/quesma/async_search_storage/in_memory_test.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,12 @@ import (
1515
func TestAsyncQueriesEvictorTimePassed(t *testing.T) {
1616
// TODO: add also 3rd storage and nice test for it (remove from memory, but still in elastic)
1717
storageKinds := []AsyncRequestResultStorage{
18-
NewAsyncSearchStorageInMemory(),
19-
NewAsyncSearchStorageInElastic(),
18+
NewAsyncRequestResultStorageInMemory(),
19+
NewAsyncRequestResultStorageInElasticsearch(),
2020
NewAsyncSearchStorageInMemoryFallbackElastic(),
2121
}
2222
for _, storage := range storageKinds {
23-
queryContextStorage := NewAsyncQueryContextStorageInMemory()
23+
queryContextStorage := NewAsyncQueryContextStorageInMemory().(AsyncQueryContextStorageInMemory)
2424
queryContextStorage.idToContext.Store("1", &AsyncQueryContext{})
2525
evictor := NewAsyncQueriesEvictor(storage, queryContextStorage)
2626
evictor.AsyncRequestStorage.Store("1", &AsyncRequestResult{added: time.Now()})
@@ -38,12 +38,12 @@ func TestAsyncQueriesEvictorTimePassed(t *testing.T) {
3838
func TestAsyncQueriesEvictorStillAlive(t *testing.T) {
3939
// TODO: add also 3rd storage and nice test for it (remove from memory, but still in elastic)
4040
storageKinds := []AsyncRequestResultStorage{
41-
NewAsyncSearchStorageInMemory(),
42-
NewAsyncSearchStorageInElastic(),
41+
NewAsyncRequestResultStorageInMemory(),
42+
NewAsyncRequestResultStorageInElasticsearch(),
4343
NewAsyncSearchStorageInMemoryFallbackElastic(),
4444
}
4545
for _, storage := range storageKinds {
46-
queryContextStorage := NewAsyncQueryContextStorageInMemory()
46+
queryContextStorage := NewAsyncQueryContextStorageInMemory().(AsyncQueryContextStorageInMemory)
4747
queryContextStorage.idToContext.Store("1", &AsyncQueryContext{})
4848
evictor := NewAsyncQueriesEvictor(storage, queryContextStorage)
4949
evictor.AsyncRequestStorage.Store("1", &AsyncRequestResult{added: time.Now()})
@@ -64,25 +64,25 @@ func TestInMemoryFallbackElasticStorage(t *testing.T) {
6464
storage.Store("2", &AsyncRequestResult{})
6565
storage.Store("3", &AsyncRequestResult{})
6666

67-
assert.Equal(t, 0, storage.elastic.DocCount()) // elastic is async, probably shouldn't be updated yet
67+
assert.Equal(t, 0, storage.inElasticsearch.DocCount()) // inElasticsearch is async, probably shouldn't be updated yet
6868
assert.Equal(t, 3, storage.inMemory.DocCount())
6969
time.Sleep(2 * time.Second)
70-
assert.Equal(t, 3, storage.elastic.DocCount())
70+
assert.Equal(t, 3, storage.inElasticsearch.DocCount())
7171
assert.Equal(t, 3, storage.DocCount())
7272

7373
storage.Delete("1")
7474
storage.Delete("2")
7575
assert.Equal(t, 1, storage.DocCount())
7676
assert.Equal(t, 1, storage.inMemory.DocCount())
77-
assert.Equal(t, 3, storage.elastic.DocCount()) // elastic is async, probably shouldn't be updated yet
77+
assert.Equal(t, 3, storage.inElasticsearch.DocCount()) // inElasticsearch is async, probably shouldn't be updated yet
7878
time.Sleep(2 * time.Second)
79-
assert.Equal(t, 1, storage.elastic.DocCount())
79+
assert.Equal(t, 1, storage.inElasticsearch.DocCount())
8080
assert.Equal(t, 1, storage.DocCount())
8181

8282
// simulate Quesma, and inMemory storage restart
83-
storage.inMemory = NewAsyncSearchStorageInMemory()
83+
storage.inMemory = NewAsyncRequestResultStorageInMemory().(AsyncRequestResultStorageInMemory)
8484
assert.Equal(t, 0, storage.DocCount())
85-
assert.Equal(t, 1, storage.elastic.DocCount())
85+
assert.Equal(t, 1, storage.inElasticsearch.DocCount())
8686

8787
doc, err := storage.Load("1")
8888
pp.Println(err, doc)

0 commit comments

Comments
 (0)