Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.

Commit 61e367a

Browse files
committed
Almost
1 parent 56245b8 commit 61e367a

File tree

9 files changed

+251
-169
lines changed

9 files changed

+251
-169
lines changed

quesma/elasticsearch/client.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,19 @@ func (es *SimpleClient) RequestWithHeaders(ctx context.Context, method, endpoint
4040
return es.doRequest(ctx, method, endpoint, body, headers)
4141
}
4242

43+
func (es *SimpleClient) DoRequestCheckResponseStatus(ctx context.Context, method, endpoint string, body []byte) (resp *http.Response, err error) {
44+
resp, err = es.doRequest(ctx, "GET", endpoint, body, nil)
45+
if err != nil {
46+
return
47+
}
48+
defer resp.Body.Close()
49+
50+
if resp.StatusCode != http.StatusOK {
51+
return resp, fmt.Errorf("response code from Elastic is not 200 OK, but %s", resp.Status)
52+
}
53+
return resp, nil
54+
}
55+
4356
func (es *SimpleClient) Authenticate(ctx context.Context, authHeader string) bool {
4457
resp, err := es.doRequest(ctx, "GET", "_security/_authenticate", nil, http.Header{"Authorization": {authHeader}})
4558
if err != nil {

quesma/persistence/elastic_with_eviction.go

Lines changed: 88 additions & 133 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package persistence
55
import (
66
"context"
77
"encoding/json"
8+
"errors"
89
"fmt"
910
"io"
1011
"net/http"
@@ -14,8 +15,8 @@ import (
1415
"time"
1516
)
1617

17-
const MAX_DOC_COUNT = 10000 // TODO: fix/make configurable/idk/etc
18-
const defaultSizeInBytesLimit = int64(1_000_000_000) // 1GB
18+
const MAX_DOC_COUNT = 10000 // TODO: fix/make configurable/idk/etc
19+
const defaultHugeSizeInBytesLimit = int64(500_000_000_000) // 500GB
1920

2021
// so far I serialize entire struct and keep only 1 string in ES
2122
type ElasticDatabaseWithEviction struct {
@@ -34,32 +35,28 @@ func NewElasticDatabaseWithEviction(cfg config.ElasticsearchConfiguration, index
3435
}
3536
}
3637

37-
// mutexy? or what
38-
func (db *ElasticDatabaseWithEviction) Put(ctx context.Context, doc *document) bool {
39-
dbSize, success := db.SizeInBytes()
40-
if !success {
41-
return false
38+
func (db *ElasticDatabaseWithEviction) Put(doc *document) error {
39+
dbSize, err := db.SizeInBytes()
40+
if err != nil {
41+
return err
4242
}
4343
fmt.Println("kk dbg Put() dbSize:", dbSize)
4444
bytesNeeded := dbSize + doc.SizeInBytes
4545
if bytesNeeded > db.SizeInBytesLimit() {
46-
logger.InfoWithCtx(ctx).Msgf("Database is full, need %d bytes more. Evicting documents", bytesNeeded-db.SizeInBytesLimit())
47-
allDocs, ok := db.getAll()
48-
if !ok {
49-
logger.WarnWithCtx(ctx).Msg("Error getting all documents")
50-
return false
46+
logger.Info().Msgf("elastic database: is full, need %d bytes more. Evicting documents", bytesNeeded-db.SizeInBytesLimit())
47+
allDocs, err := db.getAll()
48+
if err != nil {
49+
return err
5150
}
5251
indexesToEvict, bytesEvicted := db.SelectToEvict(allDocs, bytesNeeded-db.SizeInBytesLimit())
53-
logger.InfoWithCtx(ctx).Msgf("Evicting %v indexes, %d bytes", indexesToEvict, bytesEvicted)
52+
logger.Info().Msgf("elastic database: evicting %v indexes, %d bytes", indexesToEvict, bytesEvicted)
5453
db.evict(indexesToEvict)
5554
bytesNeeded -= bytesEvicted
5655
}
5756
if bytesNeeded > db.SizeInBytesLimit() {
58-
// put document
59-
return false
57+
return errors.New("elastic database: is full, cannot put document")
6058
}
6159

62-
//elasticsearchURL := fmt.Sprintf("%s/_update/%s", db.fullIndexName(), doc.Id)
6360
elasticsearchURL := fmt.Sprintf("%s/_update/%s", db.indexName, doc.Id)
6461
fmt.Println("kk dbg Put() elasticsearchURL:", elasticsearchURL)
6562

@@ -69,40 +66,23 @@ func (db *ElasticDatabaseWithEviction) Put(ctx context.Context, doc *document) b
6966

7067
jsonData, err := json.Marshal(updateContent)
7168
if err != nil {
72-
logger.WarnWithCtx(ctx).Msgf("Error marshalling document: %v", err)
73-
return false
74-
}
75-
76-
resp, err := db.httpClient.Request(context.Background(), "POST", elasticsearchURL, jsonData)
77-
if err != nil {
78-
logger.WarnWithCtx(ctx).Msgf("Error sending request to elastic: %v", err)
79-
return false
69+
return err
8070
}
81-
defer resp.Body.Close()
8271

83-
switch resp.StatusCode {
84-
case http.StatusCreated, http.StatusOK:
85-
return true
86-
default:
87-
respBody, err := io.ReadAll(resp.Body)
88-
if err != nil {
89-
logger.WarnWithCtx(ctx).Msgf("Error reading response body: %v, respBody: %v", err, respBody)
90-
}
91-
return false
72+
resp, err := db.httpClient.DoRequestCheckResponseStatus(context.Background(), http.MethodPost, elasticsearchURL, jsonData)
73+
if err != nil && resp.StatusCode != http.StatusCreated {
74+
return err
9275
}
76+
return nil
9377
}
9478

9579
// co zwraca? zrobić switch na oba typy jakie teraz mamy?
96-
func (db *ElasticDatabaseWithEviction) Get(ctx context.Context, id string) (string, bool) { // probably change return type to *Sizeable
97-
value, success, err := db.ElasticJSONDatabase.Get(id)
98-
if err != nil {
99-
logger.WarnWithCtx(ctx).Msgf("Error getting document, id: %s, error: %v", id, err)
100-
return "", false
101-
}
102-
return value, success
80+
func (db *ElasticDatabaseWithEviction) Get(id string) (string, error) { // probably change return type to *Sizeable
81+
value, _, err := db.ElasticJSONDatabase.Get(id)
82+
return value, err
10383
}
10484

105-
func (db *ElasticDatabaseWithEviction) Delete(id string) bool {
85+
func (db *ElasticDatabaseWithEviction) Delete(id string) error {
10686
// mark as deleted, don't actually delete
10787
// (single document deletion is hard in ES, it's done by evictor for entire index)
10888

@@ -115,30 +95,21 @@ func (db *ElasticDatabaseWithEviction) Delete(id string) bool {
11595

11696
jsonData, err := json.Marshal(updateContent)
11797
if err != nil {
118-
logger.WarnWithCtx(db.ctx).Msgf("Error marshalling document: %v", err)
119-
return false
98+
return err
12099
}
121100

122-
resp, err := db.httpClient.Request(context.Background(), "POST", elasticsearchURL, jsonData)
123-
if err != nil {
124-
logger.WarnWithCtx(db.ctx).Msgf("Error sending request to elastic: %v", err)
125-
return false
101+
resp, err := db.httpClient.DoRequestCheckResponseStatus(context.Background(), http.MethodPost, elasticsearchURL, jsonData)
102+
if err != nil && resp.StatusCode != http.StatusCreated {
103+
return err
126104
}
127-
defer resp.Body.Close()
105+
return nil
106+
}
128107

129-
switch resp.StatusCode {
130-
case http.StatusCreated, http.StatusOK:
131-
return true
132-
default:
133-
respBody, err := io.ReadAll(resp.Body)
134-
if err != nil {
135-
logger.WarnWithCtx(db.ctx).Msgf("Error reading response body: %v, respBody: %v", err, respBody)
136-
}
137-
return false
138-
}
108+
func (db *ElasticDatabaseWithEviction) DeleteOld(deleteOlderThan time.Duration) error {
109+
return nil
139110
}
140111

141-
func (db *ElasticDatabaseWithEviction) DocCount() (count int, success bool) {
112+
func (db *ElasticDatabaseWithEviction) DocCount() (docCount int, err error) {
142113
elasticsearchURL := fmt.Sprintf("%s/_search", db.indexName)
143114
query := `{
144115
"_source": false,
@@ -153,77 +124,57 @@ func (db *ElasticDatabaseWithEviction) DocCount() (count int, success bool) {
153124
}
154125
}`
155126

156-
resp, err := db.httpClient.Request(context.Background(), "GET", elasticsearchURL, []byte(query))
127+
var resp *http.Response
128+
resp, err = db.httpClient.DoRequestCheckResponseStatus(context.Background(), http.MethodGet, elasticsearchURL, []byte(query))
157129
if err != nil {
158-
return
130+
if resp.StatusCode == http.StatusNoContent || resp.StatusCode == http.StatusNotFound {
131+
return 0, nil
132+
}
133+
return -1, err
159134
}
160-
defer resp.Body.Close()
161135

162-
jsonAsBytes, err := io.ReadAll(resp.Body)
136+
var jsonAsBytes []byte
137+
jsonAsBytes, err = io.ReadAll(resp.Body)
163138
if err != nil {
164139
return
165140
}
166141

167-
fmt.Println("kk dbg DocCount() resp.StatusCode:", resp.StatusCode)
168-
169-
switch resp.StatusCode {
170-
case http.StatusOK:
171-
break
172-
case http.StatusNoContent, http.StatusNotFound:
173-
return 0, true
174-
default:
175-
logger.WarnWithCtx(db.ctx).Msgf("failed to get from elastic: %s, response status code: %v", string(jsonAsBytes), resp.StatusCode)
176-
return
177-
}
178-
179142
// Unmarshal the JSON response
180143
var result map[string]interface{}
181144
if err = json.Unmarshal(jsonAsBytes, &result); err != nil {
182-
logger.WarnWithCtx(db.ctx).Msgf("Error parsing the response JSON: %s", err)
183145
return
184146
}
185147

186148
fmt.Println("kk dbg DocCount() result:", result)
187149

188-
count = int(result["hits"].(map[string]interface{})["total"].(map[string]interface{})["value"].(float64)) // TODO: add some checks... to prevent panic
189-
return count, true
150+
return int(result["hits"].(map[string]interface{})["total"].(map[string]interface{})["value"].(float64)), nil // TODO: add some checks... to prevent panic
190151
}
191152

192-
func (db *ElasticDatabaseWithEviction) SizeInBytes() (sizeInBytes int64, success bool) {
153+
func (db *ElasticDatabaseWithEviction) SizeInBytes() (sizeInBytes int64, err error) {
193154
elasticsearchURL := fmt.Sprintf("%s/_search", db.indexName)
194155
query := `{
195156
"_source": ["sizeInBytes"],
196157
"size": 10000,
197158
"track_total_hits": true
198159
}`
199160

200-
resp, err := db.httpClient.Request(context.Background(), "GET", elasticsearchURL, []byte(query))
161+
var resp *http.Response
162+
resp, err = db.httpClient.DoRequestCheckResponseStatus(context.Background(), http.MethodGet, elasticsearchURL, []byte(query))
201163
if err != nil {
202164
return
203165
}
204-
defer resp.Body.Close()
205166

206-
jsonAsBytes, err := io.ReadAll(resp.Body)
167+
var jsonAsBytes []byte
168+
jsonAsBytes, err = io.ReadAll(resp.Body)
207169
if err != nil {
208170
return
209171
}
210172

211173
fmt.Println("kk dbg SizeInBytes() resp.StatusCode:", resp.StatusCode)
212174

213-
switch resp.StatusCode {
214-
case http.StatusOK:
215-
break
216-
case http.StatusNoContent, http.StatusNotFound:
217-
return 0, true
218-
default:
219-
logger.WarnWithCtx(db.ctx).Msgf("failed to get from elastic: %s, response status code: %v", string(jsonAsBytes), resp.StatusCode)
220-
return
221-
}
222-
223175
// Unmarshal the JSON response
224176
var result map[string]interface{}
225177
if err = json.Unmarshal(jsonAsBytes, &result); err != nil {
226-
logger.WarnWithCtx(db.ctx).Msgf("Error parsing the response JSON: %s", err)
227178
return
228179
}
229180

@@ -234,64 +185,68 @@ func (db *ElasticDatabaseWithEviction) SizeInBytes() (sizeInBytes int64, success
234185
a = append(a, sizeInBytes-b)
235186
}
236187
fmt.Println("kk dbg SizeInBytes() sizes in storage:", a)
237-
return sizeInBytes, true
188+
return sizeInBytes, nil
238189
}
239190

240191
func (db *ElasticDatabaseWithEviction) SizeInBytesLimit() int64 {
241192
return db.sizeInBytesLimit
242193
}
243194

244-
func (db *ElasticDatabaseWithEviction) getAll() (documents []*document, success bool) {
245-
elasticsearchURL := fmt.Sprintf("%s*/_search", db.indexName)
246-
query := `{
195+
func (db *ElasticDatabaseWithEviction) getAll() (documents []*document, err error) {
196+
_ = fmt.Sprintf("%s*/_search", db.indexName)
197+
_ = `{
247198
"_source": {
248199
"excludes": "data"
249200
},
250201
"size": 10000,
251202
"track_total_hits": true
252203
}`
204+
/*
205+
db.httpClient.
253206
254-
resp, err := db.httpClient.Request(context.Background(), "GET", elasticsearchURL, []byte(query))
255-
if err != nil {
256-
return
257-
}
258-
defer resp.Body.Close()
207+
resp, err := db.httpClient.Request(context.Background(), "GET", elasticsearchURL, []byte(query))
208+
if err != nil {
209+
return
210+
}
211+
defer resp.Body.Close()
259212
260-
jsonAsBytes, err := io.ReadAll(resp.Body)
261-
if err != nil {
262-
return
263-
}
213+
jsonAsBytes, err := io.ReadAll(resp.Body)
214+
if err != nil {
215+
return
216+
}
264217
265-
fmt.Println("kk dbg getAll() resp.StatusCode:", resp.StatusCode)
218+
fmt.Println("kk dbg getAll() resp.StatusCode:", resp.StatusCode)
266219
267-
switch resp.StatusCode {
268-
case http.StatusOK:
269-
break
270-
default:
271-
logger.WarnWithCtx(db.ctx).Msgf("failed to get from elastic: %s, response status code: %v", string(jsonAsBytes), resp.StatusCode)
272-
return
273-
}
220+
switch resp.StatusCode {
221+
case http.StatusOK:
222+
break
223+
default:
224+
logger.WarnWithCtx(db.ctx).Msgf("failed to get from elastic: %s, response status code: %v", string(jsonAsBytes), resp.StatusCode)
225+
return
226+
}
274227
275-
// Unmarshal the JSON response
276-
var result map[string]interface{}
277-
if err = json.Unmarshal(jsonAsBytes, &result); err != nil {
278-
logger.WarnWithCtx(db.ctx).Msgf("Error parsing the response JSON: %s", err)
279-
return
280-
}
228+
// Unmarshal the JSON response
229+
var result map[string]interface{}
230+
if err = json.Unmarshal(jsonAsBytes, &result); err != nil {
231+
logger.WarnWithCtx(db.ctx).Msgf("Error parsing the response JSON: %s", err)
232+
return
233+
}
281234
282-
fmt.Println("kk dbg getAll() documents:")
283-
for _, hit := range result["hits"].(map[string]interface{})["hits"].([]interface{}) {
284-
doc := &document{
285-
Id: hit.(map[string]interface{})["_id"].(string),
286-
Index: hit.(map[string]interface{})["_index"].(string),
287-
SizeInBytes: int64(hit.(map[string]interface{})["_source"].(map[string]interface{})["sizeInBytes"].(float64)), // TODO: add checks
288-
//Timestamp: hit.(map[string]interface{})["_source"].(map[string]interface{})["timestamp"].(time.Time), // TODO: add checks
289-
MarkedAsDeleted: hit.(map[string]interface{})["_source"].(map[string]interface{})["markedAsDeleted"].(bool), // TODO: add checks
235+
fmt.Println("kk dbg getAll() documents:")
236+
for _, hit := range result["hits"].(map[string]interface{})["hits"].([]interface{}) {
237+
doc := &document{
238+
Id: hit.(map[string]interface{})["_id"].(string),
239+
Index: hit.(map[string]interface{})["_index"].(string),
240+
SizeInBytes: int64(hit.(map[string]interface{})["_source"].(map[string]interface{})["sizeInBytes"].(float64)), // TODO: add checks
241+
//Timestamp: hit.(map[string]interface{})["_source"].(map[string]interface{})["timestamp"].(time.Time), // TODO: add checks
242+
MarkedAsDeleted: hit.(map[string]interface{})["_source"].(map[string]interface{})["markedAsDeleted"].(bool), // TODO: add checks
243+
}
244+
fmt.Println(doc)
245+
documents = append(documents, doc)
290246
}
291-
fmt.Println(doc)
292-
documents = append(documents, doc)
293-
}
294-
return documents, true
247+
248+
*/
249+
return documents, nil
295250
}
296251

297252
func (db *ElasticDatabaseWithEviction) evict(indexes []string) {

quesma/persistence/model.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,13 @@ type JSONDatabase interface {
1818
Put(key string, data string) error
1919
}
2020

21-
type JSONDatabaseWithEviction interface { // for sure JSON? maybe not only json? check
22-
Put(doc document) bool
23-
Get(id string) (document, bool)
24-
Delete(id string)
25-
DocCount() (int, bool)
26-
SizeInBytes() (int64, bool)
21+
type DatabaseWithEviction interface { // for sure JSON? maybe not only json? check
22+
Put(doc document) error
23+
Get(id string) (document, error)
24+
Delete(id string) error
25+
DeleteOld(time.Duration) error
26+
DocCount() (int, error)
27+
SizeInBytes() (int64, error)
2728
SizeInBytesLimit() int64
2829
}
2930

0 commit comments

Comments
 (0)