Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
0e25749
some done
trzysiek Oct 15, 2024
622c4cc
Merge branch 'main' into persistent-storage-2
trzysiek Nov 5, 2024
5928f10
Some more done
trzysiek Nov 5, 2024
3661361
Before reverse to 1 index
trzysiek Nov 8, 2024
56245b8
some more
trzysiek Nov 10, 2024
29679c3
Almost
trzysiek Nov 11, 2024
7656f8b
99% done, need to debug tests.
trzysiek Nov 11, 2024
49956b0
Some more
trzysiek Nov 11, 2024
144042a
All tests pass, maybe it works?
trzysiek Nov 11, 2024
a2c621e
Linter
trzysiek Nov 11, 2024
576b195
Minor, now should be all
trzysiek Nov 11, 2024
532f39e
Merge branch 'main' into persistent-storage-2
trzysiek Nov 11, 2024
47229e0
Fix linter
trzysiek Nov 11, 2024
47cc725
Fix config data for smoke test
trzysiek Nov 11, 2024
385fcb3
Debug for smoke test
trzysiek Nov 12, 2024
c37e8e3
Some fixes for smoke test
trzysiek Nov 12, 2024
aa72974
Merge branch 'main' into persistent-storage-2
trzysiek Nov 12, 2024
57982de
unskip all tests
trzysiek Nov 12, 2024
821b872
Reskip tests
trzysiek Nov 12, 2024
a96bd11
Style
trzysiek Nov 12, 2024
3921080
small add test
trzysiek Nov 12, 2024
eb17c02
Merge branch 'main' into persistent-storage-2
trzysiek Dec 20, 2024
b4ef499
Merge branch 'main' into persistent-storage-2
trzysiek Dec 20, 2024
45239e3
Cleanup
trzysiek Dec 20, 2024
51b1e68
Cleanup 2
trzysiek Dec 20, 2024
89a5180
Fix all tests
trzysiek Dec 20, 2024
dc40a90
Fix linter
trzysiek Dec 20, 2024
ffdc79e
Final if manual test passes
trzysiek Dec 20, 2024
cb402dc
Last: 1 name
trzysiek Dec 20, 2024
9afe54a
merge main
nablaone Feb 12, 2025
1499a49
Linter
nablaone Feb 12, 2025
a72f606
Unify error handling
nablaone Feb 12, 2025
6a72ca4
Remove gorountines. Simplification
nablaone Feb 12, 2025
96e9fa6
fmt
nablaone Feb 12, 2025
44b337c
Fix error handling
nablaone Feb 12, 2025
1abc897
Merge branch 'main' into persistent-storage-2
Mar 11, 2025
8419c0e
Fix compilation after merge
Mar 11, 2025
161407b
Merge branch 'main' into persistent-storage-2
trzysiek May 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions quesma/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,18 @@ func (es *SimpleClient) RequestWithHeaders(ctx context.Context, method, endpoint
return es.doRequest(ctx, method, endpoint, body, headers)
}

func (es *SimpleClient) DoRequestCheckResponseStatusOK(ctx context.Context, method, endpoint string, body []byte) (resp *http.Response, err error) {
resp, err = es.doRequest(ctx, method, endpoint, body, nil)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may close resp.Body here and return the result as a []byte

if err != nil {
return
}

if resp.StatusCode != http.StatusOK {
return resp, fmt.Errorf("response code from Elastic is not 200 OK, but %s", resp.Status)
}
return resp, nil
}

func (es *SimpleClient) Authenticate(ctx context.Context, authHeader string) bool {
var authEndpoint string
// This is really suboptimal, and we should find a better way to set this systematically (config perhaps?)
Expand Down
2 changes: 1 addition & 1 deletion quesma/persistence/elastic.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func (p *ElasticJSONDatabase) List() ([]string, error) {
var ids []string
// Unmarshal the JSON response
var result map[string]interface{}
if err := json.Unmarshal(jsonAsBytes, &result); err != nil {
if err = json.Unmarshal(jsonAsBytes, &result); err != nil {
log.Fatalf("Error parsing the response JSON: %s", err)
}

Expand Down
297 changes: 297 additions & 0 deletions quesma/persistence/elastic_with_eviction.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,297 @@
// Copyright Quesma, licensed under the Elastic License 2.0.
// SPDX-License-Identifier: Elastic-2.0
package persistence

import (
"context"
"encoding/json"
"errors"
"fmt"
"github.com/k0kubun/pp"
"io"
"math"
"net/http"
"quesma/quesma/config"
"quesma/quesma/types"
"time"
)

// so far I serialize entire struct and keep only 1 string in ES
type ElasticDatabaseWithEviction struct {
ctx context.Context
*ElasticJSONDatabase // maybe remove and copy fields here
// EvictorInterface TODO: rethink how eviction should work, maybe remove
sizeInBytesLimit int64
}

func NewElasticDatabaseWithEviction(cfg config.ElasticsearchConfiguration, indexName string, sizeInBytesLimit int64) *ElasticDatabaseWithEviction {
return &ElasticDatabaseWithEviction{
ctx: context.Background(),
ElasticJSONDatabase: NewElasticJSONDatabase(cfg, indexName),
// EvictorInterface: &Evictor{},
sizeInBytesLimit: sizeInBytesLimit,
}
}

const printDebugElasticDB = false // TODO: remove this + all occurances after final version of the storage

func (db *ElasticDatabaseWithEviction) Put(document *JSONWithSize) error {
dbSize, err := db.SizeInBytes()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should cache the db.SizeInBytes() here. This is an extra elastic call.

if err != nil {
return err
}
if printDebugElasticDB {
fmt.Println("kk dbg Put() dbSize:", dbSize)
}
bytesNeeded := dbSize + document.SizeInBytesTotal
if bytesNeeded > db.SizeInBytesLimit() {
return errors.New("elastic database: is full, cannot put document")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can try to perform an aggressive eviction here, and return an error if didn't help.

/*
TODO: restore after eviction readded
logger.Info().Msgf("elastic database: is full, need %d bytes more. Evicting documents", bytesNeeded-db.SizeInBytesLimit())
allDocs, err := db.getAll()
if err != nil {
return err
}
bytesEvicted := db.Evict(allDocs, bytesNeeded-db.SizeInBytesLimit())
logger.Info().Msgf("elastic database: evicted %d bytes", bytesEvicted)
bytesNeeded -= bytesEvicted
*/
}

elasticsearchURL := fmt.Sprintf("%s/_update/%s", db.indexName, document.id)
if printDebugElasticDB {
fmt.Println("kk dbg Put() elasticsearchURL:", elasticsearchURL)
}

updateContent := types.JSON{}
updateContent["doc"] = document.JSON
updateContent["doc_as_upsert"] = true

jsonData, err := json.Marshal(updateContent)
if err != nil {
return err
}

resp, err := db.httpClient.DoRequestCheckResponseStatusOK(context.Background(), http.MethodPost, elasticsearchURL, jsonData)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

resp.Body should be closed.

if printDebugElasticDB {
fmt.Println("kk dbg Put() resp:", resp, "err:", err)
}
if err != nil && (resp == nil || resp.StatusCode != http.StatusCreated) {
return err
}
return nil
}

// Get TODO: probably change return type to some more useful
func (db *ElasticDatabaseWithEviction) Get(id string) ([]byte, error) {
elasticsearchURL := fmt.Sprintf("%s/_source/%s", db.indexName, id)
resp, err := db.httpClient.DoRequestCheckResponseStatusOK(context.Background(), http.MethodGet, elasticsearchURL, nil)
if err != nil {
return nil, err
}

defer resp.Body.Close()
return io.ReadAll(resp.Body)
}

func (db *ElasticDatabaseWithEviction) Delete(id string) error {
elasticsearchURL := fmt.Sprintf("%s/_doc/%s", db.indexName, id)
resp, err := db.httpClient.DoRequestCheckResponseStatusOK(context.Background(), http.MethodDelete, elasticsearchURL, nil)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

resp.Body should be closed

if err != nil && (resp == nil || resp.StatusCode != http.StatusCreated) {
return err
}
return nil
}

func (db *ElasticDatabaseWithEviction) DeleteOld(deleteOlderThan time.Duration) (err error) {
if deleteOlderThan < 1*time.Second {
deleteOlderThan = 1 * time.Second
}

rangeStr := fmt.Sprintf("now-%dm", int(math.Floor(deleteOlderThan.Minutes())))
if deleteOlderThan < 5*time.Minute {
rangeStr = fmt.Sprintf("now-%ds", int(math.Floor(deleteOlderThan.Seconds())))
}

elasticsearchURL := fmt.Sprintf("%s/_delete_by_query", db.indexName)
query := fmt.Sprintf(`{
"query": {
"range": {
"added": {
"lte": "%s"
}
}
}
}`, rangeStr)

if printDebugElasticDB {
fmt.Println(query)
}

var resp *http.Response
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

resp.Body should be closed

resp, err = db.httpClient.DoRequestCheckResponseStatusOK(context.Background(), http.MethodPost, elasticsearchURL, []byte(query))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

404 is OK here, but we got:

quesma-1  | Feb 10 15:06:59.942 WRN quesma/quesma/async_search_storage/in_elasticsearch.go:72 > failed to delete old documents error="response code from Elastic is not 200 OK, but 404 Not Found"

if printDebugElasticDB {
fmt.Println("kk dbg DocCount() resp:", resp, "err:", err, "elastic url:", elasticsearchURL)
}
return err
}

func (db *ElasticDatabaseWithEviction) DocCount() (docCount int, err error) {
elasticsearchURL := fmt.Sprintf("%s/_search", db.indexName)
query := `{
"_source": false,
"size": 0,
"track_total_hits": true
}`

var resp *http.Response
resp, err = db.httpClient.DoRequestCheckResponseStatusOK(context.Background(), http.MethodGet, elasticsearchURL, []byte(query))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

resp.Body should be closed

if printDebugElasticDB {
fmt.Println("kk dbg DocCount() resp:", resp, "err:", err, "elastic url:", elasticsearchURL)
}
if err != nil {
if resp != nil && (resp.StatusCode == http.StatusNoContent || resp.StatusCode == http.StatusNotFound) {
return 0, nil
}
return -1, err
}

var jsonAsBytes []byte
jsonAsBytes, err = io.ReadAll(resp.Body)
if err != nil {
return
}

// Unmarshal the JSON response
var result map[string]interface{}
if err = json.Unmarshal(jsonAsBytes, &result); err != nil {
return
}

if printDebugElasticDB {
fmt.Println("kk dbg DocCount() result:", result)
}

return int(result["hits"].(map[string]interface{})["total"].(map[string]interface{})["value"].(float64)), nil // TODO: add some checks... to prevent panic
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO is right. We should check if types are ok, before casting.

}

func (db *ElasticDatabaseWithEviction) SizeInBytes() (sizeInBytes int64, err error) {
elasticsearchURL := fmt.Sprintf("%s/_search", db.indexName)
// TODO change query to aggregation
query := `{
"_source": ["sizeInBytes"],
"size": 10000,
"track_total_hits": true
}`

var resp *http.Response
resp, err = db.httpClient.DoRequestCheckResponseStatusOK(context.Background(), http.MethodGet, elasticsearchURL, []byte(query))
if printDebugElasticDB {
fmt.Println("kk dbg SizeInBytes() err:", err, "\nresp:", resp)
}
if err != nil {
if resp != nil && resp.StatusCode == 404 {
return 0, nil
}
return
}
defer resp.Body.Close() // add everywhere

var jsonAsBytes []byte
jsonAsBytes, err = io.ReadAll(resp.Body)
if err != nil {
return
}

if printDebugElasticDB {
fmt.Println("kk dbg SizeInBytes() resp.StatusCode:", resp.StatusCode)
}

// Unmarshal the JSON response
var result map[string]interface{}
if err = json.Unmarshal(jsonAsBytes, &result); err != nil {
return
}

sizes := make([]int64, 0)
for _, hit := range result["hits"].(map[string]interface{})["hits"].([]interface{}) {
if printDebugElasticDB {
pp.Println("hit:", hit)
}
b := sizeInBytes
sizeInBytes += int64(hit.(map[string]interface{})["_source"].(map[string]interface{})["sizeInBytes"].(float64)) // TODO: add checks
sizes = append(sizes, sizeInBytes-b)
}
if printDebugElasticDB {
fmt.Println("kk dbg SizeInBytes() sizes in storage:", sizes)
}
return sizeInBytes, nil
}

func (db *ElasticDatabaseWithEviction) SizeInBytesLimit() int64 {
return db.sizeInBytesLimit
}

/* TODO: restore after eviction readded, or remove
func (db *ElasticDatabaseWithEviction) getAll() (documents []*JSONWithSize, err error) {
_ = fmt.Sprintf("%s/_search", db.indexName)
_ = `{
"_source": {
"excludes": "data"
},
"size": 10000,
"track_total_hits": true
}`
db.httpClient.
resp, err := db.httpClient.Request(context.Background(), "GET", elasticsearchURL, []byte(query))
if err != nil {
return
}
defer resp.Body.Close()
jsonAsBytes, err := io.ReadAll(resp.Body)
if err != nil {
return
}
fmt.Println("kk dbg getAll() resp.StatusCode:", resp.StatusCode)
switch resp.StatusCode {
case http.StatusOK:
break
default:
logger.WarnWithCtx(db.ctx).Msgf("failed to get from elastic: %s, response status code: %v", string(jsonAsBytes), resp.StatusCode)
return
}
// Unmarshal the JSON response
var result map[string]interface{}
if err = json.Unmarshal(jsonAsBytes, &result); err != nil {
logger.WarnWithCtx(db.ctx).Msgf("Error parsing the response JSON: %s", err)
return
}
fmt.Println("kk dbg getAll() documents:")
for _, hit := range result["hits"].(map[string]interface{})["hits"].([]interface{}) {
doc := &document{
Id: hit.(map[string]interface{})["_id"].(string),
Index: hit.(map[string]interface{})["_index"].(string),
SizeInBytes: int64(hit.(map[string]interface{})["_source"].(map[string]interface{})["sizeInBytes"].(float64)), // TODO: add checks
//Timestamp: hit.(map[string]interface{})["_source"].(map[string]interface{})["timestamp"].(time.Time), // TODO: add checks
MarkedAsDeleted: hit.(map[string]interface{})["_source"].(map[string]interface{})["markedAsDeleted"].(bool), // TODO: add checks
}
fmt.Println(doc)
documents = append(documents, doc)
}
return documents, nil
}
*/

func (db *ElasticDatabaseWithEviction) fullIndexName() string {
now := time.Now().UTC()
return fmt.Sprintf("%s-%d-%d-%d-%d-%d-%d", db.indexName, now.Year(), now.Month(), now.Day(), now.Hour(), now.Minute(), now.Second())
}
17 changes: 17 additions & 0 deletions quesma/persistence/evictor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright Quesma, licensed under the Elastic License 2.0.
// SPDX-License-Identifier: Elastic-2.0
package persistence

type EvictorInterface interface {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not used. We may delete it.

Evict(documents []*JSONWithSize, sizeNeeded int64) (bytesEvicted int64)
}

// TODO: Find out how this might work. My old idea doesn't work now,
// as don't remove entire indices, but delete single documents.
// (It turned out consistency was too eventual to rely on it)
// old comment: It's only 1 implementation, which looks well suited for ElasticSearch. It can be implemented differently.
type Evictor struct{}

func (e *Evictor) Evict(documents []*JSONWithSize, sizeNeeded int64) (bytesEvicted int64) {
panic("implement me (or remove)")
}
Loading
Loading