Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/node/events/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,7 @@ func (s *Stream) initFromGenesisLocked(
ctx,
s.streamId,
storageMb,
false,
)
if err != nil {
return err
Expand Down
1 change: 1 addition & 0 deletions core/node/events/stream_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,7 @@ func (s *StreamCache) readGenesisAndCreateLocalStream(
ctx,
streamId,
&storage.MiniblockDescriptor{Data: mb, HasLegacySnapshot: true},
false,
)
if err != nil {
if IsRiverErrorCode(err, Err_ALREADY_EXISTS) {
Expand Down
4 changes: 2 additions & 2 deletions core/node/events/stream_ephemeral_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func Test_StreamCache_normalizeEphemeralStream(t *testing.T) {
storageMb, err := mb.AsStorageMb()
tc.require.NoError(err)

err = leaderInstance.params.Storage.CreateEphemeralStreamStorage(ctx, streamId, storageMb)
err = leaderInstance.params.Storage.CreateEphemeralStreamStorage(ctx, streamId, storageMb, false)
tc.require.NoError(err)

mbRef := *mb.Ref
Expand Down Expand Up @@ -100,7 +100,7 @@ func Test_StreamCache_normalizeEphemeralStream(t *testing.T) {
storageMb, err := mb.AsStorageMb()
tc.require.NoError(err)

err = leaderInstance.params.Storage.CreateEphemeralStreamStorage(ctx, streamId, storageMb)
err = leaderInstance.params.Storage.CreateEphemeralStreamStorage(ctx, streamId, storageMb, false)
tc.require.NoError(err)

mbRef := *mb.Ref
Expand Down
2 changes: 1 addition & 1 deletion core/node/events/stream_reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ func TestReconciler_SealedEphemeral(t *testing.T) {
)
genesisStorage, err := genesis.AsStorageMb()
require.NoError(err)
err = leader.params.Storage.CreateEphemeralStreamStorage(ctx, streamId, genesisStorage)
err = leader.params.Storage.CreateEphemeralStreamStorage(ctx, streamId, genesisStorage, false)
require.NoError(err)

// Write ephemeral miniblocks [1..chunks] on leader
Expand Down
6 changes: 3 additions & 3 deletions core/node/events/stream_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func TestStreamCreation_LegacySnapshot(t *testing.T) {
require.Empty(t, storageMb.Snapshot, "Genesis miniblock should not have separate snapshot field")

// Create stream storage with the genesis miniblock
err = streamStore.Storage.CreateStreamStorage(ctx, streamId, storageMb)
err = streamStore.Storage.CreateStreamStorage(ctx, streamId, storageMb, false)
require.NoError(t, err)

// Read back the stream and verify
Expand Down Expand Up @@ -95,7 +95,7 @@ func TestStreamCreation_NonLegacySnapshot(t *testing.T) {
require.NoError(t, err)

// Create stream with genesis
err = streamStore.Storage.CreateStreamStorage(ctx, streamId, storageGenesis)
err = streamStore.Storage.CreateStreamStorage(ctx, streamId, storageGenesis, false)
require.NoError(t, err)

// Now create a miniblock with non-legacy snapshot
Expand Down Expand Up @@ -200,7 +200,7 @@ func TestStreamStorage_VerifyHasLegacySnapshotFlag(t *testing.T) {
require.True(t, storageGenesis.HasLegacySnapshot, "Genesis should have legacy snapshot flag")

// Create stream
err = streamStore.Storage.CreateStreamStorage(ctx, streamId, storageGenesis)
err = streamStore.Storage.CreateStreamStorage(ctx, streamId, storageGenesis, false)
require.NoError(t, err)

// Read debug data to verify snapshot index
Expand Down
2 changes: 1 addition & 1 deletion core/node/rpc/archiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -596,7 +596,7 @@ func (a *Archiver) ArchiveStream(ctx context.Context, stream *ArchiveStream) (er
if mbsInDb <= -1 {
maxBlockNum, err := a.storage.GetMaxArchivedMiniblockNumber(ctx, stream.streamId)
if err != nil && IsRiverErrorCode(err, Err_NOT_FOUND) {
err = a.storage.CreateStreamArchiveStorage(ctx, stream.streamId)
err = a.storage.CreateStreamArchiveStorage(ctx, stream.streamId, false)
if err != nil {
return err
}
Expand Down
1 change: 1 addition & 0 deletions core/node/rpc/create_media_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ func (s *Service) createReplicatedMediaStream(
ctx,
streamId,
&storage.MiniblockDescriptor{Data: mbBytes, HasLegacySnapshot: true},
false,
)
})
}
Expand Down
2 changes: 1 addition & 1 deletion core/node/rpc/miniblock_scrub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ func writeStreamBackToStore(
require.NotNil(mb2)

// Re-write the stream with corrupt block 1
require.NoError(store.CreateStreamStorage(ctx, streamId, &storage.MiniblockDescriptor{Data: blocks[0].Data}))
require.NoError(store.CreateStreamStorage(ctx, streamId, &storage.MiniblockDescriptor{Data: blocks[0].Data}, false))
require.NoError(
store.WriteMiniblocks(
ctx,
Expand Down
2 changes: 1 addition & 1 deletion core/node/rpc/node2node_ephemeral.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (s *Service) allocateEphemeralStream(
return nil, err
}

if err = s.storage.CreateEphemeralStreamStorage(ctx, streamId, storageMb); err != nil {
if err = s.storage.CreateEphemeralStreamStorage(ctx, streamId, storageMb, false); err != nil {
return nil, err
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE es DROP COLUMN IF EXISTS lightweight;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE es ADD COLUMN IF NOT EXISTS lightweight BOOLEAN NOT NULL DEFAULT FALSE;
16 changes: 9 additions & 7 deletions core/node/storage/pg_ephemeral_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,14 @@ func (s *PostgresStreamStore) CreateEphemeralStreamStorage(
ctx context.Context,
streamId StreamId,
genesisMiniblock *MiniblockDescriptor,
isLightweight bool,
) error {
return s.txRunner(
ctx,
"CreateEphemeralStreamStorage",
pgx.ReadWrite,
func(ctx context.Context, tx pgx.Tx) error {
return s.createEphemeralStreamStorageTx(ctx, tx, streamId, genesisMiniblock)
return s.createEphemeralStreamStorageTx(ctx, tx, streamId, genesisMiniblock, isLightweight)
},
nil,
"streamId", streamId,
Expand All @@ -72,15 +73,16 @@ func (s *PostgresStreamStore) createEphemeralStreamStorageTx(
tx pgx.Tx,
streamId StreamId,
genesisMiniblock *MiniblockDescriptor,
isLightweight bool,
) error {
sql := s.sqlForStream(
`
INSERT INTO es (stream_id, latest_snapshot_miniblock, migrated, ephemeral) VALUES ($1, 0, true, true);
INSERT INTO {{miniblocks}} (stream_id, seq_num, blockdata, snapshot) VALUES ($1, 0, $2, $3);`,
INSERT INTO es (stream_id, latest_snapshot_miniblock, migrated, ephemeral, lightweight) VALUES ($1, 0, true, true, $2);
INSERT INTO {{miniblocks}} (stream_id, seq_num, blockdata, snapshot) VALUES ($1, 0, $3, $4);`,
streamId,
)

if _, err := tx.Exec(ctx, sql, streamId, genesisMiniblock.Data, genesisMiniblock.Snapshot); err != nil {
if _, err := tx.Exec(ctx, sql, streamId, isLightweight, genesisMiniblock.Data, genesisMiniblock.Snapshot); err != nil {
if pgerr, ok := err.(*pgconn.PgError); ok && pgerr.Code == pgerrcode.UniqueViolation {
return WrapRiverError(Err_ALREADY_EXISTS, err).Message("stream already exists")
}
Expand Down Expand Up @@ -187,7 +189,7 @@ func (s *PostgresStreamStore) writeEphemeralMiniblockTx(
if _, err := s.lockEphemeralStream(ctx, tx, streamId, true); err != nil {
// If the given ephemeral stream does not exist, create one by adding an extra query.
if IsRiverErrorCode(err, Err_NOT_FOUND) {
query += `INSERT INTO es (stream_id, latest_snapshot_miniblock, migrated, ephemeral) VALUES ($1, 0, true, true);`
query += `INSERT INTO es (stream_id, latest_snapshot_miniblock, migrated, ephemeral, lightweight) VALUES ($1, 0, true, true, false);`
} else {
return err
}
Expand Down Expand Up @@ -309,8 +311,8 @@ func (s *PostgresStreamStore) normalizeEphemeralStreamTx(
if _, err = tx.Exec(
ctx,
s.sqlForStream(
`INSERT INTO es (stream_id, latest_snapshot_miniblock, migrated, ephemeral)
VALUES ($1, 0, true, false) ON CONFLICT (stream_id) DO UPDATE SET ephemeral = false;
`INSERT INTO es (stream_id, latest_snapshot_miniblock, migrated, ephemeral, lightweight)
VALUES ($1, 0, true, false, false) ON CONFLICT (stream_id) DO UPDATE SET ephemeral = false;
INSERT INTO {{minipools}} (stream_id, generation, slot_num) VALUES ($1, $2, -1);`,
streamId,
),
Expand Down
4 changes: 2 additions & 2 deletions core/node/storage/pg_storage_archive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ func TestArchive(t *testing.T) {
require.Error(err)
require.Equal(Err_NOT_FOUND, AsRiverError(err).Code)

err = pgStreamStore.CreateStreamArchiveStorage(ctx, streamId1)
err = pgStreamStore.CreateStreamArchiveStorage(ctx, streamId1, false)
require.NoError(err)

err = pgStreamStore.CreateStreamArchiveStorage(ctx, streamId1)
err = pgStreamStore.CreateStreamArchiveStorage(ctx, streamId1, false)
require.Error(err)
require.Equal(Err_ALREADY_EXISTS, AsRiverError(err).Code)

Expand Down
2 changes: 1 addition & 1 deletion core/node/storage/pg_stream_lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestDbLocking(t *testing.T) {

_, err = pool.Exec(
ctx,
`INSERT INTO es (stream_id, latest_snapshot_miniblock, migrated, ephemeral) VALUES ($1, 0, true, false);
`INSERT INTO es (stream_id, latest_snapshot_miniblock, migrated, ephemeral, lightweight) VALUES ($1, 0, true, false, false);
INSERT INTO miniblocks_r00 (stream_id, seq_num, blockdata) VALUES ($1, 0, $2);
INSERT INTO minipools_r00 (stream_id, generation, slot_num) VALUES ($1, 1, -1);`,
streamId,
Expand Down
20 changes: 12 additions & 8 deletions core/node/storage/pg_stream_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,7 @@ func (s *PostgresStreamStore) CreateStreamStorage(
ctx context.Context,
streamId StreamId,
genesisMiniblock *MiniblockDescriptor,
isLightweight bool,
) error {
if len(genesisMiniblock.Data) == 0 {
return RiverError(
Expand All @@ -540,7 +541,7 @@ func (s *PostgresStreamStore) CreateStreamStorage(
"CreateStreamStorage",
pgx.ReadWrite,
func(ctx context.Context, tx pgx.Tx) error {
return s.createStreamStorageTx(ctx, tx, streamId, genesisMiniblock)
return s.createStreamStorageTx(ctx, tx, streamId, genesisMiniblock, isLightweight)
},
nil,
"streamId", streamId,
Expand Down Expand Up @@ -568,15 +569,16 @@ func (s *PostgresStreamStore) createStreamStorageTx(
tx pgx.Tx,
streamId StreamId,
genesisMiniblock *MiniblockDescriptor,
isLightweight bool,
) error {
sql := s.sqlForStream(
`
INSERT INTO es (stream_id, latest_snapshot_miniblock, migrated, ephemeral) VALUES ($1, 0, true, false);
INSERT INTO {{miniblocks}} (stream_id, seq_num, blockdata, snapshot) VALUES ($1, 0, $2, $3);
INSERT INTO es (stream_id, latest_snapshot_miniblock, migrated, ephemeral, lightweight) VALUES ($1, 0, true, false, $2);
INSERT INTO {{miniblocks}} (stream_id, seq_num, blockdata, snapshot) VALUES ($1, 0, $3, $4);
INSERT INTO {{minipools}} (stream_id, generation, slot_num) VALUES ($1, 1, -1);`,
streamId,
)
_, err := tx.Exec(ctx, sql, streamId, genesisMiniblock.Data, genesisMiniblock.Snapshot)
_, err := tx.Exec(ctx, sql, streamId, isLightweight, genesisMiniblock.Data, genesisMiniblock.Snapshot)
if err != nil {
if isPgError(err, pgerrcode.UniqueViolation) {
return WrapRiverError(Err_ALREADY_EXISTS, err).Message("stream already exists").Tag("streamId", streamId)
Expand Down Expand Up @@ -631,13 +633,14 @@ func (s *PostgresStreamStore) maybeOverwriteCorruptGenesisMiniblockTx(
func (s *PostgresStreamStore) CreateStreamArchiveStorage(
ctx context.Context,
streamId StreamId,
isLightweight bool,
) error {
return s.txRunner(
ctx,
"CreateStreamArchiveStorage",
pgx.ReadWrite,
func(ctx context.Context, tx pgx.Tx) error {
return s.createStreamArchiveStorageTx(ctx, tx, streamId)
return s.createStreamArchiveStorageTx(ctx, tx, streamId, isLightweight)
},
nil,
"streamId", streamId,
Expand All @@ -648,9 +651,10 @@ func (s *PostgresStreamStore) createStreamArchiveStorageTx(
ctx context.Context,
tx pgx.Tx,
streamId StreamId,
isLightweight bool,
) error {
sql := `INSERT INTO es (stream_id, latest_snapshot_miniblock, migrated) VALUES ($1, 0, true);`
if _, err := tx.Exec(ctx, sql, streamId); err != nil {
sql := `INSERT INTO es (stream_id, latest_snapshot_miniblock, migrated, lightweight) VALUES ($1, 0, true, $2);`
if _, err := tx.Exec(ctx, sql, streamId, isLightweight); err != nil {
if isPgError(err, pgerrcode.UniqueViolation) {
return WrapRiverError(Err_ALREADY_EXISTS, err).Message("stream already exists")
}
Expand Down Expand Up @@ -2730,7 +2734,7 @@ func (s *PostgresStreamStore) reinitializeStreamStorageTx(
// This handles race conditions atomically
tag, err := tx.Exec(
ctx,
"INSERT INTO es (stream_id, latest_snapshot_miniblock, migrated, ephemeral) VALUES ($1, $2, true, false) ON CONFLICT (stream_id) DO NOTHING",
"INSERT INTO es (stream_id, latest_snapshot_miniblock, migrated, ephemeral, lightweight) VALUES ($1, $2, true, false, false) ON CONFLICT (stream_id) DO NOTHING",
streamId,
lastSnapshotMiniblockNum,
)
Expand Down
2 changes: 1 addition & 1 deletion core/node/storage/pg_stream_store_gaps_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestGetMiniblockNumberRanges(t *testing.T) {
Hash: common.HexToHash("0x01"),
Data: []byte("genesis"),
Snapshot: []byte("snapshot0"),
},
}, false,
)
require.NoError(err)

Expand Down
8 changes: 4 additions & 4 deletions core/node/storage/pg_stream_store_reinitialize_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func TestReinitializeStreamStorage_UpdateExisting(t *testing.T) {
Data: []byte("genesis miniblock"),
Snapshot: []byte("genesis snapshot"),
}
err := store.CreateStreamStorage(ctx, streamId, genesisMb)
err := store.CreateStreamStorage(ctx, streamId, genesisMb, false)
require.NoError(err)

// Write a miniblock to extend the stream
Expand Down Expand Up @@ -324,7 +324,7 @@ func TestReinitializeStreamStorage_CandidateCleanup(t *testing.T) {
Data: []byte("genesis miniblock"),
Snapshot: []byte("genesis snapshot"),
}
err := store.CreateStreamStorage(ctx, streamId, genesisMb)
err := store.CreateStreamStorage(ctx, streamId, genesisMb, false)
require.NoError(err)

// Add multiple miniblock candidates
Expand Down Expand Up @@ -384,7 +384,7 @@ func TestReinitializeStreamStorage_TransactionRollback(t *testing.T) {
Data: []byte("genesis miniblock"),
Snapshot: []byte("genesis snapshot"),
}
err := store.CreateStreamStorage(ctx, streamId, genesisMb)
err := store.CreateStreamStorage(ctx, streamId, genesisMb, false)
require.NoError(err)

// Add event to minipool
Expand Down Expand Up @@ -677,7 +677,7 @@ func TestReinitializeStreamStorage_StreamWithoutMiniblocks(t *testing.T) {
func(ctx context.Context, tx pgx.Tx) error {
_, err := tx.Exec(
ctx,
"INSERT INTO es (stream_id, latest_snapshot_miniblock, migrated, ephemeral) VALUES ($1, 0, true, false)",
"INSERT INTO es (stream_id, latest_snapshot_miniblock, migrated, ephemeral, lightweight) VALUES ($1, 0, true, false, false)",
streamId,
)
return err
Expand Down
Loading