Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
0e25749
some done
trzysiek Oct 15, 2024
622c4cc
Merge branch 'main' into persistent-storage-2
trzysiek Nov 5, 2024
5928f10
Some more done
trzysiek Nov 5, 2024
3661361
Before reverse to 1 index
trzysiek Nov 8, 2024
56245b8
some more
trzysiek Nov 10, 2024
29679c3
Almost
trzysiek Nov 11, 2024
7656f8b
99% done, need to debug tests.
trzysiek Nov 11, 2024
49956b0
Some more
trzysiek Nov 11, 2024
144042a
All tests pass, maybe it works?
trzysiek Nov 11, 2024
a2c621e
Linter
trzysiek Nov 11, 2024
576b195
Minor, now should be all
trzysiek Nov 11, 2024
532f39e
Merge branch 'main' into persistent-storage-2
trzysiek Nov 11, 2024
47229e0
Fix linter
trzysiek Nov 11, 2024
47cc725
Fix config data for smoke test
trzysiek Nov 11, 2024
385fcb3
Debug for smoke test
trzysiek Nov 12, 2024
c37e8e3
Some fixes for smoke test
trzysiek Nov 12, 2024
aa72974
Merge branch 'main' into persistent-storage-2
trzysiek Nov 12, 2024
57982de
unskip all tests
trzysiek Nov 12, 2024
821b872
Reskip tests
trzysiek Nov 12, 2024
a96bd11
Style
trzysiek Nov 12, 2024
3921080
small add test
trzysiek Nov 12, 2024
eb17c02
Merge branch 'main' into persistent-storage-2
trzysiek Dec 20, 2024
b4ef499
Merge branch 'main' into persistent-storage-2
trzysiek Dec 20, 2024
45239e3
Cleanup
trzysiek Dec 20, 2024
51b1e68
Cleanup 2
trzysiek Dec 20, 2024
89a5180
Fix all tests
trzysiek Dec 20, 2024
dc40a90
Fix linter
trzysiek Dec 20, 2024
ffdc79e
Final if manual test passes
trzysiek Dec 20, 2024
cb402dc
Last: 1 name
trzysiek Dec 20, 2024
9afe54a
merge main
nablaone Feb 12, 2025
1499a49
Linter
nablaone Feb 12, 2025
a72f606
Unify error handling
nablaone Feb 12, 2025
6a72ca4
Remove gorountines. Simplification
nablaone Feb 12, 2025
96e9fa6
fmt
nablaone Feb 12, 2025
44b337c
Fix error handling
nablaone Feb 12, 2025
1abc897
Merge branch 'main' into persistent-storage-2
Mar 11, 2025
8419c0e
Fix compilation after merge
Mar 11, 2025
161407b
Merge branch 'main' into persistent-storage-2
trzysiek May 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/dual_write_proxy_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,8 @@ func newDualWriteProxyV2(dependencies quesma_api.Dependencies, schemaLoader clic
logManager: logManager,
publicPort: config.PublicTcpPort,
asyncQueriesEvictor: async_search_storage.NewAsyncQueriesEvictor(
queryProcessor.AsyncRequestStorage.(async_search_storage.AsyncSearchStorageInMemory),
queryProcessor.AsyncQueriesContexts.(async_search_storage.AsyncQueryContextStorageInMemory),
queryProcessor.AsyncRequestStorage,
queryProcessor.AsyncQueriesContexts,
),
queryRunner: queryProcessor,
}
Expand Down
45 changes: 45 additions & 0 deletions platform/async_search_storage/evictor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright Quesma, licensed under the Elastic License 2.0.
// SPDX-License-Identifier: Elastic-2.0
package async_search_storage

import (
"context"
"github.com/QuesmaOrg/quesma/platform/logger"
"github.com/QuesmaOrg/quesma/platform/recovery"
"time"
)

type AsyncQueriesEvictor struct {
ctx context.Context
cancel context.CancelFunc
AsyncRequestStorage AsyncRequestResultStorage
AsyncQueriesContexts AsyncQueryContextStorage
}

func NewAsyncQueriesEvictor(AsyncRequestStorage AsyncRequestResultStorage, AsyncQueriesContexts AsyncQueryContextStorage) *AsyncQueriesEvictor {
ctx, cancel := context.WithCancel(context.Background())
return &AsyncQueriesEvictor{ctx: ctx, cancel: cancel, AsyncRequestStorage: AsyncRequestStorage, AsyncQueriesContexts: AsyncQueriesContexts}
}

func (e *AsyncQueriesEvictor) tryEvictAsyncRequests(olderThan time.Duration) {
e.AsyncRequestStorage.evict(olderThan)
e.AsyncQueriesContexts.evict(olderThan)
}

func (e *AsyncQueriesEvictor) AsyncQueriesGC() {
defer recovery.LogPanic()
for {
select {
case <-e.ctx.Done():
logger.Debug().Msg("evictor stopped")
return
case <-time.After(gcInterval):
e.tryEvictAsyncRequests(evictionInterval)
}
}
}

func (e *AsyncQueriesEvictor) Close() {
e.cancel()
logger.Info().Msg("AsyncQueriesEvictor Stopped")
}
5 changes: 0 additions & 5 deletions platform/async_search_storage/in_elastic.go

This file was deleted.

131 changes: 131 additions & 0 deletions platform/async_search_storage/in_elasticsearch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
// Copyright Quesma, licensed under the Elastic License 2.0.
// SPDX-License-Identifier: Elastic-2.0
package async_search_storage

import (
"encoding/json"
"fmt"
"github.com/QuesmaOrg/quesma/platform/config"
"github.com/QuesmaOrg/quesma/platform/logger"
"github.com/QuesmaOrg/quesma/platform/persistence"

"time"
)

type AsyncRequestResultStorageInElasticsearch struct {
db *persistence.ElasticDatabaseWithEviction
}

func NewAsyncRequestResultStorageInElasticsearch(cfg config.ElasticsearchConfiguration) AsyncRequestResultStorage {
/* some test config, maybe you'll find it easier to debug with it
realUrl, err := url.Parse("http://localhost:9201")
if err != nil {
fmt.Println("ERR", err)
}
cfgUrl := config.Url(*realUrl)
cfg := config.ElasticsearchConfiguration{
Url: &cfgUrl,
User: "",
Password: "",
}
fmt.Println("kk dbg NewAsyncRequestResultStorageInElasticsearch() i:", cfg)
return AsyncRequestResultStorageInElasticsearch{
db: persistence.NewElasticDatabaseWithEviction(cfg, "quesma_async_storage-"+strconv.Itoa(i), 1_000_000_000),
}
*/
return AsyncRequestResultStorageInElasticsearch{
db: persistence.NewElasticDatabaseWithEviction(cfg, defaultElasticDbName, defaultElasticDbStorageLimitInBytes),
}
}

func (s AsyncRequestResultStorageInElasticsearch) Store(id string, result *AsyncRequestResult) {
err := s.db.Put(result.toJSON(id))
if err != nil {
logger.Warn().Err(err).Msg("failed to store document")
}
}

func (s AsyncRequestResultStorageInElasticsearch) Load(id string) (*AsyncRequestResult, error) {
resultAsBytes, err := s.db.Get(id)
if err != nil {
return nil, err
}

result := AsyncRequestResult{}
err = json.Unmarshal(resultAsBytes, &result)
if err != nil {
return nil, err
}

return &result, nil
}

func (s AsyncRequestResultStorageInElasticsearch) Delete(id string) {
err := s.db.Delete(id)
if err != nil {
logger.Warn().Err(err).Msg("failed to delete document")
}
}

func (s AsyncRequestResultStorageInElasticsearch) DeleteOld(t time.Duration) {
err := s.db.DeleteOld(t)
if err != nil {
logger.Warn().Err(err).Msg("failed to delete old documents")
}
}

// DocCount returns the number of documents in the database, or -1 if the count could not be retrieved.
func (s AsyncRequestResultStorageInElasticsearch) DocCount() int {
cnt, err := s.db.DocCount()
if err != nil {
logger.Warn().Err(err).Msg("failed to get document count")
return -1
}
return cnt
}

// StorageSizeInBytes returns the total size of all documents in the database, or -1 if the size could not be retrieved.
func (s AsyncRequestResultStorageInElasticsearch) SpaceInUse() int64 {
size, err := s.db.SizeInBytes()
if err != nil {
logger.Warn().Err(err).Msg("failed to get storage size")
return -1
}
return size
}

func (s AsyncRequestResultStorageInElasticsearch) SpaceMaxAvailable() int64 {
return s.db.SizeInBytesLimit()
}

func (s AsyncRequestResultStorageInElasticsearch) evict(evictOlderThan time.Duration) {
err := s.db.DeleteOld(evictOlderThan)
if err != nil {
logger.Warn().Err(err).Msgf("failed to evict documents, err: %v", err)
}
}

type AsyncQueryContextStorageInElasticsearch struct {
db *persistence.ElasticDatabaseWithEviction
}

func NewAsyncQueryContextStorageInElasticsearch(cfg config.ElasticsearchConfiguration) AsyncQueryContextStorage {
fmt.Println("kk dbg NewAsyncQueryContextStorageInElasticsearch() i:", cfg)
return AsyncQueryContextStorageInElasticsearch{
db: persistence.NewElasticDatabaseWithEviction(cfg, "async_search", 1_000_000_000),
}
}

func (s AsyncQueryContextStorageInElasticsearch) Store(context *AsyncQueryContext) {
err := s.db.Put(context.toJSON())
if err != nil {
logger.Warn().Err(err).Msg("failed to store document")
}
}

func (s AsyncQueryContextStorageInElasticsearch) evict(evictOlderThan time.Duration) {
err := s.db.DeleteOld(evictOlderThan)
if err != nil {
logger.Warn().Err(err).Msg("failed to delete old documents")
}
}
116 changes: 47 additions & 69 deletions platform/async_search_storage/in_memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,96 +3,92 @@
package async_search_storage

import (
"context"
"fmt"
"github.com/QuesmaOrg/quesma/platform/logger"
"github.com/QuesmaOrg/quesma/platform/recovery"
"github.com/QuesmaOrg/quesma/platform/util"
"math"
"strings"
"time"
)

const EvictionInterval = 15 * time.Minute
const GCInterval = 1 * time.Minute

type AsyncSearchStorageInMemory struct {
type AsyncRequestResultStorageInMemory struct {
idToResult *util.SyncMap[string, *AsyncRequestResult]
}

func NewAsyncSearchStorageInMemory() AsyncSearchStorageInMemory {
return AsyncSearchStorageInMemory{
func NewAsyncRequestResultStorageInMemory() AsyncRequestResultStorage { // change result type to AsyncRequestResultStorage interface
return AsyncRequestResultStorageInMemory{
idToResult: util.NewSyncMap[string, *AsyncRequestResult](),
}
}

func (s AsyncSearchStorageInMemory) Store(id string, result *AsyncRequestResult) {
func (s AsyncRequestResultStorageInMemory) Store(id string, result *AsyncRequestResult) {
s.idToResult.Store(id, result)
}

func (s AsyncSearchStorageInMemory) Range(f func(key string, value *AsyncRequestResult) bool) {
func (s AsyncRequestResultStorageInMemory) Range(f func(key string, value *AsyncRequestResult) bool) {
s.idToResult.Range(f)
}

func (s AsyncSearchStorageInMemory) Load(id string) (*AsyncRequestResult, bool) {
return s.idToResult.Load(id)
func (s AsyncRequestResultStorageInMemory) Load(id string) (*AsyncRequestResult, error) {
if val, ok := s.idToResult.Load(id); ok {
return val, nil
}
return nil, fmt.Errorf("key %s not found", id)
}

func (s AsyncSearchStorageInMemory) Delete(id string) {
func (s AsyncRequestResultStorageInMemory) Delete(id string) {
s.idToResult.Delete(id)
}

func (s AsyncSearchStorageInMemory) Size() int {
func (s AsyncRequestResultStorageInMemory) DocCount() int {
return s.idToResult.Size()
}

type AsyncQueryContextStorageInMemory struct {
idToContext *util.SyncMap[string, *AsyncQueryContext]
}

func NewAsyncQueryContextStorageInMemory() AsyncQueryContextStorageInMemory {
return AsyncQueryContextStorageInMemory{
idToContext: util.NewSyncMap[string, *AsyncQueryContext](),
}
// in bytes
func (s AsyncRequestResultStorageInMemory) SpaceInUse() int64 {
size := int64(0)
s.Range(func(key string, value *AsyncRequestResult) bool {
size += int64(len(value.ResponseBody))
return true
})
return size
}

func (s AsyncQueryContextStorageInMemory) Store(id string, context *AsyncQueryContext) {
s.idToContext.Store(id, context)
func (s AsyncRequestResultStorageInMemory) SpaceMaxAvailable() int64 {
return math.MaxInt64 / 16 // some huge number for now, can be changed if we want to limit in-memory storage
}

type AsyncQueriesEvictor struct {
ctx context.Context
cancel context.CancelFunc
AsyncRequestStorage AsyncSearchStorageInMemory
AsyncQueriesContexts AsyncQueryContextStorageInMemory
func (s AsyncRequestResultStorageInMemory) evict(evictOlderThan time.Duration) {
var ids []string
s.Range(func(key string, value *AsyncRequestResult) bool {
if time.Since(value.Added) > evictOlderThan {
ids = append(ids, key)
}
return true
})
for _, id := range ids {
s.Delete(id)
}
}

func NewAsyncQueriesEvictor(AsyncRequestStorage AsyncSearchStorageInMemory, AsyncQueriesContexts AsyncQueryContextStorageInMemory) *AsyncQueriesEvictor {
ctx, cancel := context.WithCancel(context.Background())
return &AsyncQueriesEvictor{ctx: ctx, cancel: cancel, AsyncRequestStorage: AsyncRequestStorage, AsyncQueriesContexts: AsyncQueriesContexts}
type AsyncQueryContextStorageInMemory struct {
idToContext *util.SyncMap[string, *AsyncQueryContext]
}

func elapsedTime(t time.Time) time.Duration {
return time.Since(t)
func NewAsyncQueryContextStorageInMemory() AsyncQueryContextStorage {
return AsyncQueryContextStorageInMemory{
idToContext: util.NewSyncMap[string, *AsyncQueryContext](),
}
}

type asyncQueryIdWithTime struct {
id string
time time.Time
func (s AsyncQueryContextStorageInMemory) Store(context *AsyncQueryContext) {
s.idToContext.Store(context.id, context)
}

func (e *AsyncQueriesEvictor) tryEvictAsyncRequests(timeFun func(time.Time) time.Duration) {
var ids []asyncQueryIdWithTime
e.AsyncRequestStorage.Range(func(key string, value *AsyncRequestResult) bool {
if timeFun(value.added) > EvictionInterval {
ids = append(ids, asyncQueryIdWithTime{id: key, time: value.added})
}
return true
})
for _, id := range ids {
e.AsyncRequestStorage.idToResult.Delete(id.id)
}
func (s AsyncQueryContextStorageInMemory) evict(evictOlderThan time.Duration) {
var asyncQueriesContexts []*AsyncQueryContext
e.AsyncQueriesContexts.idToContext.Range(func(key string, value *AsyncQueryContext) bool {
if timeFun(value.added) > EvictionInterval {
s.idToContext.Range(func(key string, value *AsyncQueryContext) bool {
if time.Since(value.added) > evictOlderThan {
if value != nil {
asyncQueriesContexts = append(asyncQueriesContexts, value)
}
Expand All @@ -101,7 +97,7 @@ func (e *AsyncQueriesEvictor) tryEvictAsyncRequests(timeFun func(time.Time) time
})
evictedIds := make([]string, 0)
for _, asyncQueryContext := range asyncQueriesContexts {
e.AsyncQueriesContexts.idToContext.Delete(asyncQueryContext.id)
s.idToContext.Delete(asyncQueryContext.id)
if asyncQueryContext.cancel != nil {
evictedIds = append(evictedIds, asyncQueryContext.id)
asyncQueryContext.cancel()
Expand All @@ -111,21 +107,3 @@ func (e *AsyncQueriesEvictor) tryEvictAsyncRequests(timeFun func(time.Time) time
logger.Info().Msgf("Evicted %d async queries : %s", len(evictedIds), strings.Join(evictedIds, ","))
}
}

func (e *AsyncQueriesEvictor) AsyncQueriesGC() {
defer recovery.LogPanic()
for {
select {
case <-e.ctx.Done():
logger.Debug().Msg("evictor stopped")
return
case <-time.After(GCInterval):
e.tryEvictAsyncRequests(elapsedTime)
}
}
}

func (e *AsyncQueriesEvictor) Close() {
e.cancel()
logger.Info().Msg("AsyncQueriesEvictor Stopped")
}
Loading
Loading