Skip to content

Commit b7ec18d

Browse files
authored
feat: Add Go-layer append-only enforcement for position records (#933)
1 parent d49b960 commit b7ec18d

6 files changed

Lines changed: 535 additions & 82 deletions

File tree

services/position-keeping/adapters/persistence/position_repository.go

Lines changed: 117 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -526,11 +526,127 @@ func (r *PositionRepository) BeginTx(ctx context.Context) (pgx.Tx, error) {
526526
return tx, nil
527527
}
528528

529+
// SoftDelete marks a position as deleted by setting deleted_at = NOW().
530+
// This is an allowed UPDATE operation on the append-only position table
531+
// since deleted_at is explicitly excluded from immutable field enforcement.
532+
// Returns domain.ErrNotFound if the position doesn't exist or is already deleted.
533+
func (r *PositionRepository) SoftDelete(ctx context.Context, id uuid.UUID) error {
534+
tx, err := r.pool.Begin(ctx)
535+
if err != nil {
536+
return fmt.Errorf("failed to begin transaction: %w", err)
537+
}
538+
defer func() {
539+
_ = tx.Rollback(ctx)
540+
}()
541+
542+
if err := r.setSearchPath(ctx, tx); err != nil {
543+
return err
544+
}
545+
546+
result, err := tx.Exec(ctx,
547+
"UPDATE position SET deleted_at = NOW() WHERE id = $1 AND deleted_at IS NULL",
548+
id,
549+
)
550+
if err != nil {
551+
return fmt.Errorf("failed to soft delete position: %w", err)
552+
}
553+
554+
if result.RowsAffected() == 0 {
555+
return domain.ErrNotFound
556+
}
557+
558+
if err := tx.Commit(ctx); err != nil {
559+
return fmt.Errorf("failed to commit transaction: %w", err)
560+
}
561+
562+
return nil
563+
}
564+
565+
// SoftDeleteBatch marks multiple positions as deleted atomically.
566+
// This is an allowed UPDATE operation on the append-only position table.
567+
// Positions that are already deleted or don't exist are silently skipped.
568+
func (r *PositionRepository) SoftDeleteBatch(ctx context.Context, ids []uuid.UUID) error {
569+
if len(ids) == 0 {
570+
return nil
571+
}
572+
573+
tx, err := r.pool.Begin(ctx)
574+
if err != nil {
575+
return fmt.Errorf("failed to begin transaction: %w", err)
576+
}
577+
defer func() {
578+
_ = tx.Rollback(ctx)
579+
}()
580+
581+
if err := r.setSearchPath(ctx, tx); err != nil {
582+
return err
583+
}
584+
585+
_, err = tx.Exec(ctx,
586+
"UPDATE position SET deleted_at = NOW() WHERE id = ANY($1) AND deleted_at IS NULL",
587+
ids,
588+
)
589+
if err != nil {
590+
return fmt.Errorf("failed to soft delete positions: %w", err)
591+
}
592+
593+
if err := tx.Commit(ctx); err != nil {
594+
return fmt.Errorf("failed to commit transaction: %w", err)
595+
}
596+
597+
return nil
598+
}
599+
600+
// UpdateAttributes updates only the attributes JSONB field for a position.
601+
// This is an allowed UPDATE operation on the append-only position table
602+
// since attributes is explicitly excluded from immutable field enforcement.
603+
// Returns domain.ErrNotFound if the position doesn't exist or is deleted.
604+
func (r *PositionRepository) UpdateAttributes(ctx context.Context, id uuid.UUID, attributes map[string]string) error {
605+
tx, err := r.pool.Begin(ctx)
606+
if err != nil {
607+
return fmt.Errorf("failed to begin transaction: %w", err)
608+
}
609+
defer func() {
610+
_ = tx.Rollback(ctx)
611+
}()
612+
613+
if err := r.setSearchPath(ctx, tx); err != nil {
614+
return err
615+
}
616+
617+
var attributesJSON []byte
618+
if attributes != nil {
619+
attributesJSON, err = json.Marshal(attributes)
620+
if err != nil {
621+
return fmt.Errorf("failed to marshal attributes: %w", err)
622+
}
623+
}
624+
625+
result, err := tx.Exec(ctx,
626+
"UPDATE position SET attributes = $1 WHERE id = $2 AND deleted_at IS NULL",
627+
attributesJSON, id,
628+
)
629+
if err != nil {
630+
return fmt.Errorf("failed to update attributes: %w", err)
631+
}
632+
633+
if result.RowsAffected() == 0 {
634+
return domain.ErrNotFound
635+
}
636+
637+
if err := tx.Commit(ctx); err != nil {
638+
return fmt.Errorf("failed to commit transaction: %w", err)
639+
}
640+
641+
return nil
642+
}
643+
529644
// Ensure PositionRepository implements the interface at compile time
530645
var _ domain.PositionRepository = (*PositionRepository)(nil)
531646

532-
// NOTE: No Update(), Upsert(), or Merge() methods are implemented.
647+
// NOTE: No general Update(), Upsert(), or Merge() methods are implemented.
533648
// This is intentional - the repository enforces append-only semantics.
649+
// Only SoftDelete (deleted_at) and UpdateAttributes (attributes) are allowed.
534650
// Position consolidation is handled at read time via GetAggregatedPosition().
535651

536652
// GetPositionCount returns the count of positions matching the criteria.

0 commit comments

Comments
 (0)