diff --git a/go/README.md b/go/README.md index 9f0e0dec457..ce11280a4a2 100644 --- a/go/README.md +++ b/go/README.md @@ -35,3 +35,12 @@ ALSO, ensure you have copied the `/include` directory of the release to `../incl Then, to install the plugins, run the `go install` commands from the `Dockerfile`. The exact commands are not here because we would be duplicating where versions live if we did. The `Dockerfile` is the source of truth for the versions. Once those are all installed, you can run `make build` to build the project and most importantly, the generated protobuf files which your IDE will complain about until they are generated. + +## Schema Migrations + +From the directory with the migrations/ and schema/ directories, you can generate a new schema by +changing the files in schema directly and running this command: + +``` +atlas migrate diff --dir file://migrations --to file://schema --dev-url 'docker://postgres/15/dev?search_path=public' +``` diff --git a/go/pkg/log/repository/log.go b/go/pkg/log/repository/log.go index 47dd8f4beee..2e2cf4d6790 100644 --- a/go/pkg/log/repository/log.go +++ b/go/pkg/log/repository/log.go @@ -23,7 +23,7 @@ type LogRepository struct { sysDb sysdb.ISysDB } -func (r *LogRepository) InsertRecords(ctx context.Context, collectionId string, records [][]byte) (insertCount int64, err error) { +func (r *LogRepository) InsertRecords(ctx context.Context, collectionId string, records [][]byte) (insertCount int64, isSealed bool, err error) { var tx pgx.Tx tx, err = r.conn.BeginTx(ctx, pgx.TxOptions{}) if err != nil { @@ -66,6 +66,13 @@ func (r *LogRepository) InsertRecords(ctx context.Context, collectionId string, return } } + if collection.IsSealed { + insertCount = 0 + isSealed = true + err = nil + return + } + isSealed = false params := make([]log.InsertRecordParams, len(records)) for i, record := range records { offset := collection.RecordEnumerationOffsetPosition + int64(i) + 1 diff --git a/go/pkg/log/repository/log_test.go b/go/pkg/log/repository/log_test.go index b9e17ff3039..5e338bd2583 100644 --- a/go/pkg/log/repository/log_test.go +++ b/go/pkg/log/repository/log_test.go @@ -45,13 +45,15 @@ func (suite *LogTestSuite) TestGarbageCollection() { collectionID2 := types.NewUniqueID() // Add records to collection 1 - count, err := suite.lr.InsertRecords(ctx, collectionID1.String(), [][]byte{{1, 2, 3}}) + count, isSealed, err := suite.lr.InsertRecords(ctx, collectionID1.String(), [][]byte{{1, 2, 3}}) assert.NoError(suite.t, err, "Failed to insert records") + assert.False(suite.t, isSealed, count, "Log sealed") assert.Equal(suite.t, int64(1), count, "Failed to insert records") // Add records to collection 2 - count, err = suite.lr.InsertRecords(ctx, collectionID2.String(), [][]byte{{1, 2, 3}}) + count, isSealed, err = suite.lr.InsertRecords(ctx, collectionID2.String(), [][]byte{{1, 2, 3}}) assert.NoError(suite.t, err, "Failed to insert records") + assert.False(suite.t, isSealed, count, "Log sealed") assert.Equal(suite.t, int64(1), count, "Failed to insert records") // Add collection 1 to sysdb @@ -72,8 +74,9 @@ func (suite *LogTestSuite) TestGarbageCollection() { assert.Equal(suite.t, 0, len(records), "Failed to run garbage collection") // Add records to collection 2, expect offset to reset - count, err = suite.lr.InsertRecords(ctx, collectionID2.String(), [][]byte{{4, 5, 6}}) + count, isSealed, err = suite.lr.InsertRecords(ctx, collectionID2.String(), [][]byte{{4, 5, 6}}) assert.NoError(suite.t, err, "Failed to insert records") + assert.False(suite.t, isSealed, count, "Log sealed") assert.Equal(suite.t, int64(1), count, "Failed to insert records") records, err = suite.lr.PullRecords(ctx, collectionID2.String(), 1, 1, time.Now().UnixNano()) assert.NoError(suite.t, err, "Failed to pull records") @@ -110,6 +113,24 @@ func (suite *LogTestSuite) TestUniqueConstraintPushLogs() { } } +func (suite *LogTestSuite) TestSealedLogWontPush() { + ctx := context.Background() + collectionId := types.NewUniqueID() + params := log.InsertCollectionParams { + ID: collectionId.String(), + RecordEnumerationOffsetPosition: 1, + RecordCompactionOffsetPosition: 0, + } + _, err := suite.lr.queries.InsertCollection(ctx, params) + assert.NoError(suite.t, err, "Initializing log should not fail.") + _, err = suite.lr.queries.SealLog(ctx, collectionId.String()) + assert.NoError(suite.t, err, "Sealing log should not fail.") + var isSealed bool + _, isSealed, err = suite.lr.InsertRecords(ctx, collectionId.String(), [][]byte{{1,2,3}}) + assert.NoError(suite.t, err, "Failed to push logs") + assert.True(suite.t, isSealed, "Did not report was sealed") +} + func TestLogTestSuite(t *testing.T) { testSuite := new(LogTestSuite) testSuite.t = t diff --git a/go/pkg/log/server/server.go b/go/pkg/log/server/server.go index d7fdcded4d3..bcbd36a8e3e 100644 --- a/go/pkg/log/server/server.go +++ b/go/pkg/log/server/server.go @@ -36,12 +36,14 @@ func (s *logServer) PushLogs(ctx context.Context, req *logservicepb.PushLogsRequ recordsContent = append(recordsContent, data) } var recordCount int64 - recordCount, err = s.lr.InsertRecords(ctx, collectionID.String(), recordsContent) + var isSealed bool + recordCount, isSealed, err = s.lr.InsertRecords(ctx, collectionID.String(), recordsContent) if err != nil { return } res = &logservicepb.PushLogsResponse{ RecordCount: int32(recordCount), + LogIsSealed: isSealed, } return } diff --git a/go/pkg/log/store/db/copyfrom.go b/go/pkg/log/store/db/copyfrom.go index e31b7d60e79..453012eff80 100644 --- a/go/pkg/log/store/db/copyfrom.go +++ b/go/pkg/log/store/db/copyfrom.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.28.0 +// sqlc v1.29.0 // source: copyfrom.go package log diff --git a/go/pkg/log/store/db/db.go b/go/pkg/log/store/db/db.go index 7aff917c947..84cd44f954c 100644 --- a/go/pkg/log/store/db/db.go +++ b/go/pkg/log/store/db/db.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.28.0 +// sqlc v1.29.0 package log diff --git a/go/pkg/log/store/db/models.go b/go/pkg/log/store/db/models.go index 61f400289b7..e9a27b4719b 100644 --- a/go/pkg/log/store/db/models.go +++ b/go/pkg/log/store/db/models.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.28.0 +// sqlc v1.29.0 package log @@ -8,6 +8,7 @@ type Collection struct { ID string RecordCompactionOffsetPosition int64 RecordEnumerationOffsetPosition int64 + IsSealed bool } type RecordLog struct { diff --git a/go/pkg/log/store/db/queries.sql.go b/go/pkg/log/store/db/queries.sql.go index ecf3279dc6b..341491c19d2 100644 --- a/go/pkg/log/store/db/queries.sql.go +++ b/go/pkg/log/store/db/queries.sql.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.28.0 +// sqlc v1.29.0 // source: queries.sql package log @@ -138,7 +138,7 @@ func (q *Queries) GetBoundsForCollection(ctx context.Context, id string) (GetBou } const getCollectionForUpdate = `-- name: GetCollectionForUpdate :one -SELECT id, record_compaction_offset_position, record_enumeration_offset_position +SELECT id, record_compaction_offset_position, record_enumeration_offset_position, is_sealed FROM collection WHERE id = $1 FOR UPDATE @@ -147,7 +147,12 @@ FOR UPDATE func (q *Queries) GetCollectionForUpdate(ctx context.Context, id string) (Collection, error) { row := q.db.QueryRow(ctx, getCollectionForUpdate, id) var i Collection - err := row.Scan(&i.ID, &i.RecordCompactionOffsetPosition, &i.RecordEnumerationOffsetPosition) + err := row.Scan( + &i.ID, + &i.RecordCompactionOffsetPosition, + &i.RecordEnumerationOffsetPosition, + &i.IsSealed, + ) return i, err } @@ -233,7 +238,7 @@ func (q *Queries) GetTotalUncompactedRecordsCount(ctx context.Context) (int64, e } const insertCollection = `-- name: InsertCollection :one -INSERT INTO collection (id, record_enumeration_offset_position, record_compaction_offset_position) values($1, $2, $3) returning id, record_compaction_offset_position, record_enumeration_offset_position +INSERT INTO collection (id, record_enumeration_offset_position, record_compaction_offset_position) values($1, $2, $3) returning id, record_compaction_offset_position, record_enumeration_offset_position, is_sealed ` type InsertCollectionParams struct { @@ -245,7 +250,12 @@ type InsertCollectionParams struct { func (q *Queries) InsertCollection(ctx context.Context, arg InsertCollectionParams) (Collection, error) { row := q.db.QueryRow(ctx, insertCollection, arg.ID, arg.RecordEnumerationOffsetPosition, arg.RecordCompactionOffsetPosition) var i Collection - err := row.Scan(&i.ID, &i.RecordCompactionOffsetPosition, &i.RecordEnumerationOffsetPosition) + err := row.Scan( + &i.ID, + &i.RecordCompactionOffsetPosition, + &i.RecordEnumerationOffsetPosition, + &i.IsSealed, + ) return i, err } @@ -265,6 +275,22 @@ func (q *Queries) PurgeRecords(ctx context.Context) error { return err } +const sealLog = `-- name: SealLog :one +UPDATE collection SET is_sealed = true WHERE id = $1 returning id, record_compaction_offset_position, record_enumeration_offset_position, is_sealed +` + +func (q *Queries) SealLog(ctx context.Context, id string) (Collection, error) { + row := q.db.QueryRow(ctx, sealLog, id) + var i Collection + err := row.Scan( + &i.ID, + &i.RecordCompactionOffsetPosition, + &i.RecordEnumerationOffsetPosition, + &i.IsSealed, + ) + return i, err +} + const updateCollectionCompactionOffsetPosition = `-- name: UpdateCollectionCompactionOffsetPosition :exec UPDATE collection set record_compaction_offset_position = $2 where id = $1 ` diff --git a/go/pkg/log/store/migrations/20250515150704.sql b/go/pkg/log/store/migrations/20250515150704.sql new file mode 100644 index 00000000000..d4e50c8697f --- /dev/null +++ b/go/pkg/log/store/migrations/20250515150704.sql @@ -0,0 +1,2 @@ +-- Modify "collection" table +ALTER TABLE "collection" ADD COLUMN "is_sealed" boolean NOT NULL DEFAULT false; diff --git a/go/pkg/log/store/migrations/atlas.sum b/go/pkg/log/store/migrations/atlas.sum index 47a838c240c..d9362b2a884 100644 --- a/go/pkg/log/store/migrations/atlas.sum +++ b/go/pkg/log/store/migrations/atlas.sum @@ -1,2 +1,3 @@ -h1:kG+ejV1DS3youx+m5SNNFYabJeDqfYTdSQHbJtR2/eU= +h1:Ha4eNnpWJQC0flnfRZKnpw06LblDCh5ocPykLIy5koE= 20240404181827_initial.sql h1:xnoD1FcXImqQPJOvaDbTOwTGPLtCP3RibetuaaZeATI= +20250515150704.sql h1:ySONZilpdnd0BHQhsVAoSj5ud7/3gpI03VuwwWqZqrE= diff --git a/go/pkg/log/store/queries/queries.sql b/go/pkg/log/store/queries/queries.sql index a7de796c52e..964417bb34a 100644 --- a/go/pkg/log/store/queries/queries.sql +++ b/go/pkg/log/store/queries/queries.sql @@ -64,3 +64,6 @@ INSERT INTO record_log ("offset", collection_id, timestamp, record) SELECT record_log.offset, $2, record_log.timestamp, record_log.record FROM record_log WHERE record_log.collection_id = $1; + +-- name: SealLog :one +UPDATE collection SET is_sealed = true WHERE id = $1 returning *; diff --git a/go/pkg/log/store/schema/collection.sql b/go/pkg/log/store/schema/collection.sql index c3be68e1e2f..b885f0c039f 100644 --- a/go/pkg/log/store/schema/collection.sql +++ b/go/pkg/log/store/schema/collection.sql @@ -1,7 +1,8 @@ CREATE TABLE collection ( id text PRIMARY KEY, record_compaction_offset_position bigint NOT NULL, - record_enumeration_offset_position bigint NOT NULL + record_enumeration_offset_position bigint NOT NULL, + is_sealed boolean NOT NULL DEFAULT false ); -- The `record_compaction_offset_position` column indicates the offset position of the latest compaction, offsets are 1-indexed. diff --git a/idl/chromadb/proto/logservice.proto b/idl/chromadb/proto/logservice.proto index 92bc615ea77..6e6ceaa76b9 100644 --- a/idl/chromadb/proto/logservice.proto +++ b/idl/chromadb/proto/logservice.proto @@ -12,6 +12,7 @@ message PushLogsRequest { message PushLogsResponse { int32 record_count = 1; + bool log_is_sealed = 2; } message ScoutLogsRequest { diff --git a/rust/log-service/src/lib.rs b/rust/log-service/src/lib.rs index 6f8fdc33ed0..724713ae851 100644 --- a/rust/log-service/src/lib.rs +++ b/rust/log-service/src/lib.rs @@ -691,7 +691,10 @@ impl LogService for LogServer { Status::new(err.code().into(), err.to_string()) } })?; - Ok(Response::new(PushLogsResponse { record_count })) + Ok(Response::new(PushLogsResponse { + record_count, + log_is_sealed: false, + })) } .instrument(span) .await diff --git a/rust/log/src/grpc_log.rs b/rust/log/src/grpc_log.rs index c079509f1fd..c3b8a3e28bf 100644 --- a/rust/log/src/grpc_log.rs +++ b/rust/log/src/grpc_log.rs @@ -43,6 +43,8 @@ impl ChromaError for GrpcPullLogsError { pub enum GrpcPushLogsError { #[error("Please backoff exponentially and retry")] Backoff, + #[error("The log is sealed. No writes can happen.")] + Sealed, #[error("Failed to push logs: {0}")] FailedToPushLogs(#[from] tonic::Status), #[error("Failed to convert records to proto")] @@ -55,6 +57,7 @@ impl ChromaError for GrpcPushLogsError { GrpcPushLogsError::Backoff => ErrorCodes::AlreadyExists, GrpcPushLogsError::FailedToPushLogs(_) => ErrorCodes::Internal, GrpcPushLogsError::ConversionError(_) => ErrorCodes::Internal, + GrpcPushLogsError::Sealed => ErrorCodes::FailedPrecondition, } } } @@ -323,7 +326,8 @@ impl GrpcLog { >>()?, }; - self.client_for(tenant, collection_id) + let resp = self + .client_for(tenant, collection_id) .push_logs(request) .await .map_err(|err| { @@ -335,8 +339,12 @@ impl GrpcLog { err.into() } })?; - - Ok(()) + let resp = resp.into_inner(); + if resp.log_is_sealed { + Err(GrpcPushLogsError::Sealed) + } else { + Ok(()) + } } pub(super) async fn fork_logs(