Skip to content

[ENH] Add a tool to reason through the state space of bootstrap. #4558

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions go/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,12 @@ ALSO, ensure you have copied the `/include` directory of the release to `../incl
Then, to install the plugins, run the `go install` commands from the `Dockerfile`. The exact commands are not here because we would be duplicating where versions live if we did. The `Dockerfile` is the source of truth for the versions.

Once those are all installed, you can run `make build` to build the project and most importantly, the generated protobuf files which your IDE will complain about until they are generated.

## Schema Migrations

From the directory with the migrations/ and schema/ directories, you can generate a new schema by
changing the files in schema directly and running this command:

```
atlas migrate diff --dir file://migrations --to file://schema --dev-url 'docker://postgres/15/dev?search_path=public'
```
9 changes: 8 additions & 1 deletion go/pkg/log/repository/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type LogRepository struct {
sysDb sysdb.ISysDB
}

func (r *LogRepository) InsertRecords(ctx context.Context, collectionId string, records [][]byte) (insertCount int64, err error) {
func (r *LogRepository) InsertRecords(ctx context.Context, collectionId string, records [][]byte) (insertCount int64, isSealed bool, err error) {
var tx pgx.Tx
tx, err = r.conn.BeginTx(ctx, pgx.TxOptions{})
if err != nil {
Expand Down Expand Up @@ -66,6 +66,13 @@ func (r *LogRepository) InsertRecords(ctx context.Context, collectionId string,
return
}
}
if collection.IsSealed {
insertCount = 0
isSealed = true
err = nil
return
}
isSealed = false
params := make([]log.InsertRecordParams, len(records))
for i, record := range records {
offset := collection.RecordEnumerationOffsetPosition + int64(i) + 1
Expand Down
27 changes: 24 additions & 3 deletions go/pkg/log/repository/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,15 @@ func (suite *LogTestSuite) TestGarbageCollection() {
collectionID2 := types.NewUniqueID()

// Add records to collection 1
count, err := suite.lr.InsertRecords(ctx, collectionID1.String(), [][]byte{{1, 2, 3}})
count, isSealed, err := suite.lr.InsertRecords(ctx, collectionID1.String(), [][]byte{{1, 2, 3}})
assert.NoError(suite.t, err, "Failed to insert records")
assert.False(suite.t, isSealed, count, "Log sealed")
assert.Equal(suite.t, int64(1), count, "Failed to insert records")

// Add records to collection 2
count, err = suite.lr.InsertRecords(ctx, collectionID2.String(), [][]byte{{1, 2, 3}})
count, isSealed, err = suite.lr.InsertRecords(ctx, collectionID2.String(), [][]byte{{1, 2, 3}})
assert.NoError(suite.t, err, "Failed to insert records")
assert.False(suite.t, isSealed, count, "Log sealed")
assert.Equal(suite.t, int64(1), count, "Failed to insert records")

// Add collection 1 to sysdb
Expand All @@ -72,8 +74,9 @@ func (suite *LogTestSuite) TestGarbageCollection() {
assert.Equal(suite.t, 0, len(records), "Failed to run garbage collection")

// Add records to collection 2, expect offset to reset
count, err = suite.lr.InsertRecords(ctx, collectionID2.String(), [][]byte{{4, 5, 6}})
count, isSealed, err = suite.lr.InsertRecords(ctx, collectionID2.String(), [][]byte{{4, 5, 6}})
assert.NoError(suite.t, err, "Failed to insert records")
assert.False(suite.t, isSealed, count, "Log sealed")
assert.Equal(suite.t, int64(1), count, "Failed to insert records")
records, err = suite.lr.PullRecords(ctx, collectionID2.String(), 1, 1, time.Now().UnixNano())
assert.NoError(suite.t, err, "Failed to pull records")
Expand Down Expand Up @@ -110,6 +113,24 @@ func (suite *LogTestSuite) TestUniqueConstraintPushLogs() {
}
}

func (suite *LogTestSuite) TestSealedLogWontPush() {
ctx := context.Background()
collectionId := types.NewUniqueID()
params := log.InsertCollectionParams {
ID: collectionId.String(),
RecordEnumerationOffsetPosition: 1,
RecordCompactionOffsetPosition: 0,
}
_, err := suite.lr.queries.InsertCollection(ctx, params)
assert.NoError(suite.t, err, "Initializing log should not fail.")
_, err = suite.lr.queries.SealLog(ctx, collectionId.String())
assert.NoError(suite.t, err, "Sealing log should not fail.")
var isSealed bool
_, isSealed, err = suite.lr.InsertRecords(ctx, collectionId.String(), [][]byte{{1,2,3}})
assert.NoError(suite.t, err, "Failed to push logs")
assert.True(suite.t, isSealed, "Did not report was sealed")
}

func TestLogTestSuite(t *testing.T) {
testSuite := new(LogTestSuite)
testSuite.t = t
Expand Down
4 changes: 3 additions & 1 deletion go/pkg/log/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,14 @@ func (s *logServer) PushLogs(ctx context.Context, req *logservicepb.PushLogsRequ
recordsContent = append(recordsContent, data)
}
var recordCount int64
recordCount, err = s.lr.InsertRecords(ctx, collectionID.String(), recordsContent)
var isSealed bool
recordCount, isSealed, err = s.lr.InsertRecords(ctx, collectionID.String(), recordsContent)
if err != nil {
return
}
res = &logservicepb.PushLogsResponse{
RecordCount: int32(recordCount),
LogIsSealed: isSealed,
}
return
}
Expand Down
2 changes: 1 addition & 1 deletion go/pkg/log/store/db/copyfrom.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion go/pkg/log/store/db/db.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion go/pkg/log/store/db/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

36 changes: 31 additions & 5 deletions go/pkg/log/store/db/queries.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions go/pkg/log/store/migrations/20250515150704.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- Modify "collection" table
ALTER TABLE "collection" ADD COLUMN "is_sealed" boolean NOT NULL DEFAULT false;
3 changes: 2 additions & 1 deletion go/pkg/log/store/migrations/atlas.sum
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
h1:kG+ejV1DS3youx+m5SNNFYabJeDqfYTdSQHbJtR2/eU=
h1:Ha4eNnpWJQC0flnfRZKnpw06LblDCh5ocPykLIy5koE=
20240404181827_initial.sql h1:xnoD1FcXImqQPJOvaDbTOwTGPLtCP3RibetuaaZeATI=
20250515150704.sql h1:ySONZilpdnd0BHQhsVAoSj5ud7/3gpI03VuwwWqZqrE=
3 changes: 3 additions & 0 deletions go/pkg/log/store/queries/queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,6 @@ INSERT INTO record_log ("offset", collection_id, timestamp, record)
SELECT record_log.offset, $2, record_log.timestamp, record_log.record
FROM record_log
WHERE record_log.collection_id = $1;

-- name: SealLog :one
UPDATE collection SET is_sealed = true WHERE id = $1 returning *;
3 changes: 2 additions & 1 deletion go/pkg/log/store/schema/collection.sql
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
CREATE TABLE collection (
id text PRIMARY KEY,
record_compaction_offset_position bigint NOT NULL,
record_enumeration_offset_position bigint NOT NULL
record_enumeration_offset_position bigint NOT NULL,
is_sealed boolean NOT NULL DEFAULT false
);

-- The `record_compaction_offset_position` column indicates the offset position of the latest compaction, offsets are 1-indexed.
Expand Down
1 change: 1 addition & 0 deletions idl/chromadb/proto/logservice.proto
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ message PushLogsRequest {

message PushLogsResponse {
int32 record_count = 1;
bool log_is_sealed = 2;
}

message ScoutLogsRequest {
Expand Down
5 changes: 4 additions & 1 deletion rust/log-service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -691,7 +691,10 @@ impl LogService for LogServer {
Status::new(err.code().into(), err.to_string())
}
})?;
Ok(Response::new(PushLogsResponse { record_count }))
Ok(Response::new(PushLogsResponse {
record_count,
log_is_sealed: false,
}))
}
.instrument(span)
.await
Expand Down
14 changes: 11 additions & 3 deletions rust/log/src/grpc_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ impl ChromaError for GrpcPullLogsError {
pub enum GrpcPushLogsError {
#[error("Please backoff exponentially and retry")]
Backoff,
#[error("The log is sealed. No writes can happen.")]
Sealed,
#[error("Failed to push logs: {0}")]
FailedToPushLogs(#[from] tonic::Status),
#[error("Failed to convert records to proto")]
Expand All @@ -55,6 +57,7 @@ impl ChromaError for GrpcPushLogsError {
GrpcPushLogsError::Backoff => ErrorCodes::AlreadyExists,
GrpcPushLogsError::FailedToPushLogs(_) => ErrorCodes::Internal,
GrpcPushLogsError::ConversionError(_) => ErrorCodes::Internal,
GrpcPushLogsError::Sealed => ErrorCodes::FailedPrecondition,
}
}
}
Expand Down Expand Up @@ -323,7 +326,8 @@ impl GrpcLog {
>>()?,
};

self.client_for(tenant, collection_id)
let resp = self
.client_for(tenant, collection_id)
.push_logs(request)
.await
.map_err(|err| {
Expand All @@ -335,8 +339,12 @@ impl GrpcLog {
err.into()
}
})?;

Ok(())
let resp = resp.into_inner();
if resp.log_is_sealed {
Err(GrpcPushLogsError::Sealed)
} else {
Ok(())
}
}

pub(super) async fn fork_logs(
Expand Down
Loading
Loading