Skip to content

Commit 048583d

Browse files
authored
[ENH]: Add retry for add, update and upsert (#4457)
## Description of changes _Summarize the changes made by this PR._ - Improvements & Bug fixes - Adds exponential retry to coll.add(), coll.update() and coll.upsert() for duplicate key errors returned by postgres - New functionality - ... ## Test plan _How are these changes tested?_ - [x] Tests pass locally with `pytest` for python, `yarn test` for js, `cargo test` for rust ## Documentation Changes None
1 parent 8ec2671 commit 048583d

File tree

4 files changed

+175
-35
lines changed

4 files changed

+175
-35
lines changed

go/pkg/log/repository/log.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,13 @@ package repository
33
import (
44
"context"
55
"errors"
6+
"fmt"
67
"time"
78

89
log "github.com/chroma-core/chroma/go/pkg/log/store/db"
910
"github.com/chroma-core/chroma/go/pkg/log/sysdb"
1011
"github.com/jackc/pgx/v5"
12+
"github.com/jackc/pgx/v5/pgconn"
1113
"github.com/jackc/pgx/v5/pgxpool"
1214
trace_log "github.com/pingcap/log"
1315
"go.uber.org/zap"
@@ -68,6 +70,14 @@ func (r *LogRepository) InsertRecords(ctx context.Context, collectionId string,
6870
}
6971
insertCount, err = queriesWithTx.InsertRecord(ctx, params)
7072
if err != nil {
73+
var pgErr *pgconn.PgError
74+
// This is a retryable error and should be retried by upstream. It happens
75+
// when two concurrent adds to the same collection happen.
76+
if errors.As(err, &pgErr) && pgErr.Code == "23505" {
77+
trace_log.Error("Duplicate key error while inserting into record_log", zap.String("collectionId", collectionId), zap.String("detail", pgErr.Detail))
78+
err = status.Error(codes.AlreadyExists, fmt.Sprintf("Duplicate key error while inserting into record_log: %s", pgErr.Detail))
79+
return
80+
}
7181
trace_log.Error("Error in inserting records to record_log table", zap.Error(err), zap.String("collectionId", collectionId))
7282
return
7383
}

go/pkg/log/repository/log_test.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,16 @@ package repository
22

33
import (
44
"context"
5+
"errors"
56
"testing"
67
"time"
78

89
"github.com/chroma-core/chroma/go/pkg/log/configuration"
10+
log "github.com/chroma-core/chroma/go/pkg/log/store/db"
911
"github.com/chroma-core/chroma/go/pkg/log/sysdb"
1012
"github.com/chroma-core/chroma/go/pkg/types"
1113
libs2 "github.com/chroma-core/chroma/go/shared/libs"
14+
"github.com/jackc/pgx/v5/pgconn"
1215
"github.com/jackc/pgx/v5/pgxpool"
1316
"github.com/stretchr/testify/assert"
1417
"github.com/stretchr/testify/suite"
@@ -79,6 +82,34 @@ func (suite *LogTestSuite) TestGarbageCollection() {
7982
assert.Equal(suite.t, int64(1), records[0].Offset, "Failed to run garbage collection")
8083
}
8184

85+
func (suite *LogTestSuite) TestUniqueConstraintPushLogs() {
86+
ctx := context.Background()
87+
collectionId := types.NewUniqueID()
88+
89+
records := [][]byte{
90+
{1, 2, 3},
91+
{4, 5, 6},
92+
}
93+
params := make([]log.InsertRecordParams, 2)
94+
for i, record := range records {
95+
offset := 1
96+
params[i] = log.InsertRecordParams{
97+
CollectionID: collectionId.String(),
98+
Record: record,
99+
Offset: int64(offset),
100+
Timestamp: time.Now().UnixNano(),
101+
}
102+
}
103+
_, err := suite.lr.queries.InsertRecord(ctx, params)
104+
assert.Error(suite.t, err, "Failed to insert records")
105+
var pgErr *pgconn.PgError
106+
if errors.As(err, &pgErr) {
107+
assert.Equal(suite.t, "23505", pgErr.Code, "Expected SQLSTATE 23505 for duplicate key")
108+
} else {
109+
assert.Fail(suite.t, "Expected pgconn.PgError but got different error", err)
110+
}
111+
}
112+
82113
func TestLogTestSuite(t *testing.T) {
83114
testSuite := new(LogTestSuite)
84115
testSuite.t = t

rust/frontend/src/impls/service_based_frontend.rs

Lines changed: 130 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use crate::{
22
config::FrontendConfig, executor::Executor, types::errors::ValidationError,
33
CollectionsWithSegmentsProvider,
44
};
5-
use backon::Retryable;
5+
use backon::{ExponentialBuilder, Retryable};
66
use chroma_config::{registry, Configurable};
77
use chroma_error::{ChromaError, ErrorCodes};
88
use chroma_log::{LocalCompactionManager, LocalCompactionManagerConfig, Log};
@@ -35,10 +35,10 @@ use chroma_types::{
3535
};
3636
use opentelemetry::global;
3737
use opentelemetry::metrics::Counter;
38-
use std::collections::HashSet;
3938
use std::sync::atomic::{AtomicUsize, Ordering};
4039
use std::sync::Arc;
4140
use std::time::{SystemTime, UNIX_EPOCH};
41+
use std::{collections::HashSet, time::Duration};
4242

4343
use super::utils::to_records;
4444

@@ -49,6 +49,9 @@ struct Metrics {
4949
count_retries_counter: Counter<u64>,
5050
query_retries_counter: Counter<u64>,
5151
get_retries_counter: Counter<u64>,
52+
add_retries_counter: Counter<u64>,
53+
update_retries_counter: Counter<u64>,
54+
upsert_retries_counter: Counter<u64>,
5255
}
5356

5457
#[derive(Clone, Debug)]
@@ -61,6 +64,7 @@ pub struct ServiceBasedFrontend {
6164
max_batch_size: u32,
6265
metrics: Arc<Metrics>,
6366
default_knn_index: KnnIndex,
67+
retries_builder: ExponentialBuilder,
6468
}
6569

6670
impl ServiceBasedFrontend {
@@ -78,14 +82,32 @@ impl ServiceBasedFrontend {
7882
let delete_retries_counter = meter.u64_counter("delete_retries").build();
7983
let count_retries_counter = meter.u64_counter("count_retries").build();
8084
let query_retries_counter = meter.u64_counter("query_retries").build();
81-
let get_retries_counter = meter.u64_counter("query_retries").build();
85+
let get_retries_counter = meter.u64_counter("get_retries").build();
86+
let add_retries_counter = meter.u64_counter("add_retries").build();
87+
let update_retries_counter = meter.u64_counter("update_retries").build();
88+
let upsert_retries_counter = meter.u64_counter("upsert_retries").build();
8289
let metrics = Arc::new(Metrics {
8390
fork_retries_counter,
8491
delete_retries_counter,
8592
count_retries_counter,
8693
query_retries_counter,
8794
get_retries_counter,
95+
add_retries_counter,
96+
update_retries_counter,
97+
upsert_retries_counter,
8898
});
99+
// factor: 2.0,
100+
// min_delay_ms: 100,
101+
// max_delay_ms: 5000,
102+
// max_attempts: 5,
103+
// jitter: true,
104+
// TODO(Sanket): Ideally config for this.
105+
let retries_builder = ExponentialBuilder::default()
106+
.with_max_times(5)
107+
.with_factor(2.0)
108+
.with_max_delay(Duration::from_millis(5000))
109+
.with_min_delay(Duration::from_millis(100))
110+
.with_jitter();
89111
ServiceBasedFrontend {
90112
allow_reset,
91113
executor,
@@ -95,6 +117,7 @@ impl ServiceBasedFrontend {
95117
max_batch_size,
96118
metrics,
97119
default_knn_index,
120+
retries_builder,
98121
}
99122
}
100123

@@ -630,6 +653,14 @@ impl ServiceBasedFrontend {
630653
res
631654
}
632655

656+
pub async fn retryable_push_logs(
657+
&mut self,
658+
collection_id: CollectionUuid,
659+
records: Vec<OperationRecord>,
660+
) -> Result<(), Box<dyn ChromaError>> {
661+
self.log_client.push_logs(collection_id, records).await
662+
}
663+
633664
pub async fn add(
634665
&mut self,
635666
AddCollectionRecordsRequest {
@@ -656,16 +687,29 @@ impl ServiceBasedFrontend {
656687
to_records(ids, embeddings, documents, uris, metadatas, Operation::Add)
657688
.map_err(|err| Box::new(err) as Box<dyn ChromaError>)?;
658689

659-
self.log_client
660-
.push_logs(collection_id, records)
661-
.await
662-
.map_err(|err| {
663-
if err.code() == ErrorCodes::Unavailable {
664-
AddCollectionRecordsError::Backoff
665-
} else {
666-
AddCollectionRecordsError::Other(Box::new(err) as _)
690+
let retries = Arc::new(AtomicUsize::new(0));
691+
let add_to_retry = || {
692+
let mut self_clone = self.clone();
693+
let records_clone = records.clone();
694+
async move {
695+
self_clone
696+
.retryable_push_logs(collection_id, records_clone)
697+
.await
698+
}
699+
};
700+
let res = add_to_retry
701+
.retry(self.retries_builder)
702+
.when(|e| matches!(e.code(), ErrorCodes::AlreadyExists))
703+
.notify(|_, _| {
704+
let retried = retries.fetch_add(1, Ordering::Relaxed);
705+
if retried > 0 {
706+
tracing::info!("Retrying add() request for collection {}", collection_id);
667707
}
668-
})?;
708+
})
709+
.await;
710+
self.metrics
711+
.add_retries_counter
712+
.add(retries.load(Ordering::Relaxed) as u64, &[]);
669713

670714
// TODO: Submit event after the response is sent
671715
MeterEvent::CollectionWrite {
@@ -678,7 +722,16 @@ impl ServiceBasedFrontend {
678722
.submit()
679723
.await;
680724

681-
Ok(AddCollectionRecordsResponse {})
725+
match res {
726+
Ok(()) => Ok(AddCollectionRecordsResponse {}),
727+
Err(e) => {
728+
if e.code() == ErrorCodes::AlreadyExists {
729+
Err(AddCollectionRecordsError::Backoff)
730+
} else {
731+
Err(AddCollectionRecordsError::Other(Box::new(e) as _))
732+
}
733+
}
734+
}
682735
}
683736

684737
pub async fn update(
@@ -711,16 +764,29 @@ impl ServiceBasedFrontend {
711764
)
712765
.map_err(|err| Box::new(err) as Box<dyn ChromaError>)?;
713766

714-
self.log_client
715-
.push_logs(collection_id, records)
716-
.await
717-
.map_err(|err| {
718-
if err.code() == ErrorCodes::Unavailable {
719-
UpdateCollectionRecordsError::Backoff
720-
} else {
721-
UpdateCollectionRecordsError::Other(Box::new(err) as _)
767+
let retries = Arc::new(AtomicUsize::new(0));
768+
let add_to_retry = || {
769+
let mut self_clone = self.clone();
770+
let records_clone = records.clone();
771+
async move {
772+
self_clone
773+
.retryable_push_logs(collection_id, records_clone)
774+
.await
775+
}
776+
};
777+
let res = add_to_retry
778+
.retry(self.retries_builder)
779+
.when(|e| matches!(e.code(), ErrorCodes::AlreadyExists))
780+
.notify(|_, _| {
781+
let retried = retries.fetch_add(1, Ordering::Relaxed);
782+
if retried > 0 {
783+
tracing::info!("Retrying update() request for collection {}", collection_id);
722784
}
723-
})?;
785+
})
786+
.await;
787+
self.metrics
788+
.update_retries_counter
789+
.add(retries.load(Ordering::Relaxed) as u64, &[]);
724790

725791
// TODO: Submit event after the response is sent
726792
MeterEvent::CollectionWrite {
@@ -733,7 +799,16 @@ impl ServiceBasedFrontend {
733799
.submit()
734800
.await;
735801

736-
Ok(UpdateCollectionRecordsResponse {})
802+
match res {
803+
Ok(()) => Ok(UpdateCollectionRecordsResponse {}),
804+
Err(e) => {
805+
if e.code() == ErrorCodes::AlreadyExists {
806+
Err(UpdateCollectionRecordsError::Backoff)
807+
} else {
808+
Err(UpdateCollectionRecordsError::Other(Box::new(e) as _))
809+
}
810+
}
811+
}
737812
}
738813

739814
pub async fn upsert(
@@ -768,16 +843,29 @@ impl ServiceBasedFrontend {
768843
)
769844
.map_err(|err| Box::new(err) as Box<dyn ChromaError>)?;
770845

771-
self.log_client
772-
.push_logs(collection_id, records)
773-
.await
774-
.map_err(|err| {
775-
if err.code() == ErrorCodes::Unavailable {
776-
UpsertCollectionRecordsError::Backoff
777-
} else {
778-
UpsertCollectionRecordsError::Other(Box::new(err) as _)
846+
let retries = Arc::new(AtomicUsize::new(0));
847+
let add_to_retry = || {
848+
let mut self_clone = self.clone();
849+
let records_clone = records.clone();
850+
async move {
851+
self_clone
852+
.retryable_push_logs(collection_id, records_clone)
853+
.await
854+
}
855+
};
856+
let res = add_to_retry
857+
.retry(self.retries_builder)
858+
.when(|e| matches!(e.code(), ErrorCodes::AlreadyExists))
859+
.notify(|_, _| {
860+
let retried = retries.fetch_add(1, Ordering::Relaxed);
861+
if retried > 0 {
862+
tracing::info!("Retrying upsert() request for collection {}", collection_id);
779863
}
780-
})?;
864+
})
865+
.await;
866+
self.metrics
867+
.upsert_retries_counter
868+
.add(retries.load(Ordering::Relaxed) as u64, &[]);
781869

782870
// TODO: Submit event after the response is sent
783871
MeterEvent::CollectionWrite {
@@ -790,7 +878,16 @@ impl ServiceBasedFrontend {
790878
.submit()
791879
.await;
792880

793-
Ok(UpsertCollectionRecordsResponse {})
881+
match res {
882+
Ok(()) => Ok(UpsertCollectionRecordsResponse {}),
883+
Err(e) => {
884+
if e.code() == ErrorCodes::AlreadyExists {
885+
Err(UpsertCollectionRecordsError::Backoff)
886+
} else {
887+
Err(UpsertCollectionRecordsError::Other(Box::new(e) as _))
888+
}
889+
}
890+
}
794891
}
795892

796893
pub async fn retryable_delete(

rust/log/src/grpc_log.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ pub enum GrpcPushLogsError {
5252
impl ChromaError for GrpcPushLogsError {
5353
fn code(&self) -> ErrorCodes {
5454
match self {
55-
GrpcPushLogsError::Backoff => ErrorCodes::Unavailable,
55+
GrpcPushLogsError::Backoff => ErrorCodes::AlreadyExists,
5656
GrpcPushLogsError::FailedToPushLogs(_) => ErrorCodes::Internal,
5757
GrpcPushLogsError::ConversionError(_) => ErrorCodes::Internal,
5858
}
@@ -315,7 +315,9 @@ impl GrpcLog {
315315
.push_logs(request)
316316
.await
317317
.map_err(|err| {
318-
if err.code() == ErrorCodes::Unavailable.into() {
318+
if err.code() == ErrorCodes::Unavailable.into()
319+
|| err.code() == ErrorCodes::AlreadyExists.into()
320+
{
319321
GrpcPushLogsError::Backoff
320322
} else {
321323
err.into()

0 commit comments

Comments
 (0)