Skip to content

Commit f4c446d

Browse files
authored
Preserve RAM at RAG indexing by streaming files (#4613)
At RAG indexation time, we used to read the file from disk, keep it in memory in a buffer and send it to the network. This can be problematic if we have plenty of parallel indexing jobs, as the RAM consumption could skyrocket. So, we now stream the file in a Pipe by chunks of 32KB (the default size of `io.Copy`), avoiding RAM peak consumption even with large files.
2 parents aa4bc59 + 5aa616d commit f4c446d

File tree

1 file changed

+40
-29
lines changed

1 file changed

+40
-29
lines changed

model/rag/index.go

Lines changed: 40 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -221,55 +221,66 @@ func callRAGIndexer(inst *instance.Instance, doctype string, change couchdb.Chan
221221
}
222222
}
223223

224-
var requestBody bytes.Buffer
225-
writer := multipart.NewWriter(&requestBody)
226-
part, err := writer.CreateFormFile("file", name)
227-
if err != nil {
228-
return err
229-
}
230-
_, err = io.Copy(part, content)
231-
if err != nil {
232-
return err
233-
}
234-
// No need to add filename here, it is already set through the file form
235-
meta := map[string]string{
236-
"md5sum": md5sum,
237-
"datetime": datetime,
238-
"doctype": doctype,
239-
}
240-
ragMetadata, err := json.Marshal(meta)
241-
if err != nil {
242-
return err
243-
}
244-
_ = writer.WriteField("metadata", string(ragMetadata))
245-
err = writer.Close()
246-
if err != nil {
247-
return err
248-
}
249224
u.RawQuery = url.Values{
250225
"dir_id": []string{dirID},
251226
"name": []string{name},
252227
"md5sum": []string{md5sum},
253228
}.Encode()
254-
255229
u.Path = fmt.Sprintf("/indexer/partition/%s/file/%s", inst.Domain, change.DocID)
230+
231+
// Create pipe and writer for file streaming
232+
pr, pw := io.Pipe()
233+
writer := multipart.NewWriter(pw)
234+
235+
go func() {
236+
part, err := writer.CreateFormFile("file", name)
237+
if err != nil {
238+
_ = pw.CloseWithError(err)
239+
return
240+
}
241+
if _, err := io.Copy(part, content); err != nil {
242+
_ = pw.CloseWithError(err)
243+
return
244+
}
245+
246+
// No need to add filename here, it is already set through the file form
247+
meta := map[string]string{
248+
"md5sum": md5sum,
249+
"datetime": datetime,
250+
"doctype": doctype,
251+
}
252+
ragMetadata, err := json.Marshal(meta)
253+
if err != nil {
254+
_ = pw.CloseWithError(err)
255+
return
256+
}
257+
if err := writer.WriteField("metadata", string(ragMetadata)); err != nil {
258+
_ = pw.CloseWithError(err)
259+
return
260+
}
261+
defer pw.Close()
262+
defer writer.Close()
263+
}()
264+
256265
if isNewFile {
257-
req, err = http.NewRequest(http.MethodPost, u.String(), &requestBody)
266+
req, err = http.NewRequest(http.MethodPost, u.String(), pr)
258267
} else {
259-
req, err = http.NewRequest(http.MethodPut, u.String(), &requestBody)
268+
req, err = http.NewRequest(http.MethodPut, u.String(), pr)
260269
}
261270
if err != nil {
262271
return err
263272
}
273+
264274
req.Header.Add(echo.HeaderAuthorization, "Bearer "+ragServer.APIKey)
265275
req.Header.Add("Content-Type", writer.FormDataContentType())
266276

267277
res, err = http.DefaultClient.Do(req)
268278
if err != nil {
269279
return err
270280
}
281+
271282
var response map[string]interface{}
272-
if err = json.NewDecoder(res.Body).Decode(&response); err != nil {
283+
if err := json.NewDecoder(res.Body).Decode(&response); err != nil {
273284
return err
274285
}
275286
defer res.Body.Close()

0 commit comments

Comments
 (0)