Skip to content

Commit 64e04b4

Browse files
committed
feat: add --merge flag to bd import for last-writer-wins conflict resolution
When importing a JSONL backup on a multi-machine setup, a stale snapshot can reopen locally-closed issues because the upsert blindly overwrites the status field. The new --merge flag compares updated_at timestamps and skips updates where the local record is same age or newer, preventing stale snapshots from overwriting local changes. Default import behaviour (faithful snapshot restore) is unchanged. Fixes pa-x34l.2.4
1 parent b404538 commit 64e04b4

File tree

6 files changed

+217
-9
lines changed

6 files changed

+217
-9
lines changed

cmd/bd/auto_import_upgrade.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ func maybeAutoImportJSONL(ctx context.Context, s storage.DoltStorage, beadsDir s
3838
// Database is empty but JSONL has data — auto-import.
3939
fmt.Fprintf(os.Stderr, "auto-importing %d bytes from %s into empty database...\n", info.Size(), jsonlPath)
4040

41-
result, err := importFromLocalJSONLFull(ctx, s, jsonlPath)
41+
result, err := importFromLocalJSONLFull(ctx, s, jsonlPath, false)
4242
if err != nil {
4343
fmt.Fprintf(os.Stderr, "warning: auto-import from %s failed: %v\n", jsonlPath, err)
4444
fmt.Fprintf(os.Stderr, "\nYour issues are still safe in %s.\n", jsonlPath)

cmd/bd/import.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,17 +28,20 @@ brings new issues, 'bd import' loads them into the local Dolt database.
2828
EXAMPLES:
2929
bd import # Import from .beads/issues.jsonl
3030
bd import backup.jsonl # Import from a specific file
31-
bd import --dry-run # Show what would be imported`,
31+
bd import --dry-run # Show what would be imported
32+
bd import --merge # Import, but skip issues where local is newer`,
3233
GroupID: "sync",
3334
RunE: runImport,
3435
}
3536

3637
var (
3738
importDryRun bool
39+
importMerge bool
3840
)
3941

4042
func init() {
4143
importCmd.Flags().BoolVar(&importDryRun, "dry-run", false, "Show what would be imported without importing")
44+
importCmd.Flags().BoolVar(&importMerge, "merge", false, "Use last-writer-wins: skip updates where the local record is newer (by updated_at)")
4245
rootCmd.AddCommand(importCmd)
4346
}
4447

@@ -78,7 +81,7 @@ func runImport(cmd *cobra.Command, args []string) error {
7881
return fmt.Errorf("no database — run 'bd init' or 'bd bootstrap' first")
7982
}
8083

81-
result, err := importFromLocalJSONLFull(ctx, store, jsonlPath)
84+
result, err := importFromLocalJSONLFull(ctx, store, jsonlPath, importMerge)
8285
if err != nil {
8386
return fmt.Errorf("import failed: %w", err)
8487
}

cmd/bd/import_from_jsonl_test.go

Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -378,6 +378,180 @@ func TestImportFromLocalJSONL(t *testing.T) {
378378
})
379379
}
380380

381+
func TestImportMergeByTimestamp(t *testing.T) {
382+
skipIfNoDolt(t)
383+
384+
t.Run("merge skips update when local record is newer", func(t *testing.T) {
385+
tmpDir := t.TempDir()
386+
dbPath := filepath.Join(tmpDir, "dolt")
387+
store := newTestStore(t, dbPath)
388+
ctx := context.Background()
389+
390+
// Step 1: Import an issue that is open, updated at T2.
391+
initial := `{"id":"test-merge1","title":"Original","status":"open","priority":2,"issue_type":"task","created_at":"2025-01-01T00:00:00Z","updated_at":"2025-06-01T00:00:00Z"}
392+
`
393+
jsonlPath := filepath.Join(tmpDir, "issues.jsonl")
394+
if err := os.WriteFile(jsonlPath, []byte(initial), 0644); err != nil {
395+
t.Fatalf("write initial JSONL: %v", err)
396+
}
397+
if _, err := importFromLocalJSONLFull(ctx, store, jsonlPath, false); err != nil {
398+
t.Fatalf("initial import: %v", err)
399+
}
400+
401+
// Step 2: Simulate local close — re-import with status=closed at a newer T3.
402+
closed := `{"id":"test-merge1","title":"Original","status":"closed","priority":2,"issue_type":"task","created_at":"2025-01-01T00:00:00Z","updated_at":"2025-09-01T00:00:00Z","closed_at":"2025-09-01T00:00:00Z","close_reason":"done"}
403+
`
404+
if err := os.WriteFile(jsonlPath, []byte(closed), 0644); err != nil {
405+
t.Fatalf("write closed JSONL: %v", err)
406+
}
407+
if _, err := importFromLocalJSONLFull(ctx, store, jsonlPath, false); err != nil {
408+
t.Fatalf("close import: %v", err)
409+
}
410+
411+
// Verify issue is closed.
412+
issue, err := store.GetIssue(ctx, "test-merge1")
413+
if err != nil {
414+
t.Fatalf("get issue after close: %v", err)
415+
}
416+
if issue.Status != "closed" {
417+
t.Fatalf("expected status 'closed', got %q", issue.Status)
418+
}
419+
420+
// Step 3: Import a STALE snapshot (T2, status=open) with --merge.
421+
// This simulates pulling a backup from another machine that predates
422+
// the local close.
423+
stale := `{"id":"test-merge1","title":"Stale title","status":"open","priority":2,"issue_type":"task","created_at":"2025-01-01T00:00:00Z","updated_at":"2025-06-01T00:00:00Z"}
424+
`
425+
if err := os.WriteFile(jsonlPath, []byte(stale), 0644); err != nil {
426+
t.Fatalf("write stale JSONL: %v", err)
427+
}
428+
result, err := importFromLocalJSONLFull(ctx, store, jsonlPath, true)
429+
if err != nil {
430+
t.Fatalf("merge import: %v", err)
431+
}
432+
if result.Issues != 1 {
433+
t.Errorf("expected 1 issue processed, got %d", result.Issues)
434+
}
435+
436+
// Verify the issue is STILL closed — the stale snapshot was skipped.
437+
issue, err = store.GetIssue(ctx, "test-merge1")
438+
if err != nil {
439+
t.Fatalf("get issue after merge: %v", err)
440+
}
441+
if issue.Status != "closed" {
442+
t.Errorf("merge allowed stale snapshot to reopen issue: status=%q, want 'closed'", issue.Status)
443+
}
444+
if issue.Title != "Original" {
445+
t.Errorf("merge allowed stale snapshot to overwrite title: got %q, want 'Original'", issue.Title)
446+
}
447+
})
448+
449+
t.Run("merge allows update when incoming record is newer", func(t *testing.T) {
450+
tmpDir := t.TempDir()
451+
dbPath := filepath.Join(tmpDir, "dolt")
452+
store := newTestStore(t, dbPath)
453+
ctx := context.Background()
454+
455+
// Step 1: Import an issue at T1.
456+
initial := `{"id":"test-merge2","title":"Old title","status":"open","priority":2,"issue_type":"task","created_at":"2025-01-01T00:00:00Z","updated_at":"2025-01-01T00:00:00Z"}
457+
`
458+
jsonlPath := filepath.Join(tmpDir, "issues.jsonl")
459+
if err := os.WriteFile(jsonlPath, []byte(initial), 0644); err != nil {
460+
t.Fatalf("write initial JSONL: %v", err)
461+
}
462+
if _, err := importFromLocalJSONLFull(ctx, store, jsonlPath, false); err != nil {
463+
t.Fatalf("initial import: %v", err)
464+
}
465+
466+
// Step 2: Import a NEWER snapshot with --merge. This should update.
467+
newer := `{"id":"test-merge2","title":"New title","status":"closed","priority":1,"issue_type":"task","created_at":"2025-01-01T00:00:00Z","updated_at":"2025-12-01T00:00:00Z","closed_at":"2025-12-01T00:00:00Z"}
468+
`
469+
if err := os.WriteFile(jsonlPath, []byte(newer), 0644); err != nil {
470+
t.Fatalf("write newer JSONL: %v", err)
471+
}
472+
if _, err := importFromLocalJSONLFull(ctx, store, jsonlPath, true); err != nil {
473+
t.Fatalf("merge import: %v", err)
474+
}
475+
476+
issue, err := store.GetIssue(ctx, "test-merge2")
477+
if err != nil {
478+
t.Fatalf("get issue after merge: %v", err)
479+
}
480+
if issue.Status != "closed" {
481+
t.Errorf("merge should have applied newer update: status=%q, want 'closed'", issue.Status)
482+
}
483+
if issue.Title != "New title" {
484+
t.Errorf("merge should have applied newer title: got %q, want 'New title'", issue.Title)
485+
}
486+
})
487+
488+
t.Run("without merge flag stale snapshot overwrites", func(t *testing.T) {
489+
// Control test: without --merge, the old behaviour applies.
490+
tmpDir := t.TempDir()
491+
dbPath := filepath.Join(tmpDir, "dolt")
492+
store := newTestStore(t, dbPath)
493+
ctx := context.Background()
494+
495+
// Import at T2 as closed.
496+
closed := `{"id":"test-merge3","title":"Closed issue","status":"closed","priority":2,"issue_type":"task","created_at":"2025-01-01T00:00:00Z","updated_at":"2025-09-01T00:00:00Z","closed_at":"2025-09-01T00:00:00Z"}
497+
`
498+
jsonlPath := filepath.Join(tmpDir, "issues.jsonl")
499+
if err := os.WriteFile(jsonlPath, []byte(closed), 0644); err != nil {
500+
t.Fatalf("write JSONL: %v", err)
501+
}
502+
if _, err := importFromLocalJSONLFull(ctx, store, jsonlPath, false); err != nil {
503+
t.Fatalf("initial import: %v", err)
504+
}
505+
506+
// Import stale (T1, open) WITHOUT merge — should overwrite.
507+
stale := `{"id":"test-merge3","title":"Stale","status":"open","priority":2,"issue_type":"task","created_at":"2025-01-01T00:00:00Z","updated_at":"2025-01-01T00:00:00Z"}
508+
`
509+
if err := os.WriteFile(jsonlPath, []byte(stale), 0644); err != nil {
510+
t.Fatalf("write stale JSONL: %v", err)
511+
}
512+
if _, err := importFromLocalJSONLFull(ctx, store, jsonlPath, false); err != nil {
513+
t.Fatalf("stale import: %v", err)
514+
}
515+
516+
issue, err := store.GetIssue(ctx, "test-merge3")
517+
if err != nil {
518+
t.Fatalf("get issue: %v", err)
519+
}
520+
if issue.Status != "open" {
521+
t.Errorf("without merge, stale import should overwrite: status=%q, want 'open'", issue.Status)
522+
}
523+
})
524+
525+
t.Run("merge creates new issues normally", func(t *testing.T) {
526+
tmpDir := t.TempDir()
527+
dbPath := filepath.Join(tmpDir, "dolt")
528+
store := newTestStore(t, dbPath)
529+
ctx := context.Background()
530+
531+
jsonl := `{"id":"test-merge4","title":"Brand new","status":"open","priority":2,"issue_type":"task","created_at":"2025-01-01T00:00:00Z","updated_at":"2025-01-01T00:00:00Z"}
532+
`
533+
jsonlPath := filepath.Join(tmpDir, "issues.jsonl")
534+
if err := os.WriteFile(jsonlPath, []byte(jsonl), 0644); err != nil {
535+
t.Fatalf("write JSONL: %v", err)
536+
}
537+
result, err := importFromLocalJSONLFull(ctx, store, jsonlPath, true)
538+
if err != nil {
539+
t.Fatalf("merge import of new issue: %v", err)
540+
}
541+
if result.Issues != 1 {
542+
t.Errorf("expected 1 issue, got %d", result.Issues)
543+
}
544+
545+
issue, err := store.GetIssue(ctx, "test-merge4")
546+
if err != nil {
547+
t.Fatalf("get issue: %v", err)
548+
}
549+
if issue.Title != "Brand new" {
550+
t.Errorf("expected title 'Brand new', got %q", issue.Title)
551+
}
552+
})
553+
}
554+
381555
func TestImportFromLocalJSONL_LegacyFormats(t *testing.T) {
382556
skipIfNoDolt(t)
383557

cmd/bd/import_shared.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ type ImportOptions struct {
2525
DeletionIDs []string
2626
SkipPrefixValidation bool
2727
ProtectLocalExportIDs map[string]time.Time
28+
MergeByTimestamp bool
2829
}
2930

3031
// ImportResult describes what an import operation did.
@@ -53,6 +54,7 @@ func importIssuesCore(ctx context.Context, _ string, store storage.DoltStorage,
5354
err := store.CreateIssuesWithFullOptions(ctx, issues, getActorWithGit(), storage.BatchCreateOptions{
5455
OrphanHandling: storage.OrphanAllow,
5556
SkipPrefixValidation: opts.SkipPrefixValidation,
57+
MergeByTimestamp: opts.MergeByTimestamp,
5658
})
5759
if err != nil {
5860
return nil, err
@@ -78,7 +80,7 @@ type memoryRecord struct {
7880
// into the Dolt store. Returns the number of issues imported and any error.
7981
// This is a convenience wrapper around importFromLocalJSONLFull.
8082
func importFromLocalJSONL(ctx context.Context, store storage.DoltStorage, localPath string) (int, error) {
81-
result, err := importFromLocalJSONLFull(ctx, store, localPath)
83+
result, err := importFromLocalJSONLFull(ctx, store, localPath, false)
8284
if err != nil {
8385
return 0, err
8486
}
@@ -88,7 +90,9 @@ func importFromLocalJSONL(ctx context.Context, store storage.DoltStorage, localP
8890
// importFromLocalJSONLFull imports issues and memories from a local JSONL file.
8991
// It detects memory records (lines with "_type":"memory") and imports them
9092
// via SetConfig, while routing regular issue records through the normal path.
91-
func importFromLocalJSONLFull(ctx context.Context, store storage.DoltStorage, localPath string) (*importLocalResult, error) {
93+
// When merge is true, existing issues are only updated if the incoming record
94+
// has a strictly newer updated_at timestamp (last-writer-wins).
95+
func importFromLocalJSONLFull(ctx context.Context, store storage.DoltStorage, localPath string, merge bool) (*importLocalResult, error) {
9296
//nolint:gosec // G304: path from user-provided CLI argument
9397
data, err := os.ReadFile(localPath)
9498
if err != nil {
@@ -182,6 +186,7 @@ func importFromLocalJSONLFull(ctx context.Context, store storage.DoltStorage, lo
182186

183187
opts := ImportOptions{
184188
SkipPrefixValidation: true,
189+
MergeByTimestamp: merge,
185190
}
186191
_, err = importIssuesCore(ctx, "", store, issues, opts)
187192
if err != nil {

internal/storage/batch.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,4 +22,10 @@ type BatchCreateOptions struct {
2222
OrphanHandling OrphanHandling
2323
// SkipPrefixValidation skips prefix validation for existing IDs (used during import)
2424
SkipPrefixValidation bool
25+
// MergeByTimestamp enables last-writer-wins conflict resolution during import.
26+
// When true, existing issues are only updated if the incoming record has a
27+
// newer updated_at timestamp than the local record. This prevents stale
28+
// snapshots from overwriting locally-modified issues (e.g. reopening a
29+
// locally-closed issue).
30+
MergeByTimestamp bool
2531
}

internal/storage/issueops/create.go

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ func CreateIssueInTx(ctx context.Context, tx *sql.Tx, bc *BatchContext, issue *t
8484
return nil
8585
}
8686

87-
isNew, err := InsertIssueIfNew(ctx, tx, issueTable, issue)
87+
isNew, err := InsertIssueIfNew(ctx, tx, issueTable, issue, bc.Opts.MergeByTimestamp)
8888
if err != nil {
8989
return err
9090
}
@@ -237,19 +237,39 @@ func CheckOrphan(ctx context.Context, tx *sql.Tx, issue *types.Issue, issueTable
237237
}
238238

239239
// InsertIssueIfNew inserts the issue and returns whether it was genuinely new.
240+
// When mergeByTimestamp is true, existing issues are only updated if the
241+
// incoming record's updated_at is strictly newer than the local record's.
240242
//
241243
//nolint:gosec // G201: table is a hardcoded constant
242-
func InsertIssueIfNew(ctx context.Context, tx *sql.Tx, issueTable string, issue *types.Issue) (isNew bool, err error) {
243-
var existingCount int
244+
func InsertIssueIfNew(ctx context.Context, tx *sql.Tx, issueTable string, issue *types.Issue, mergeByTimestamp bool) (isNew bool, err error) {
244245
if issue.ID != "" {
246+
var existingCount int
245247
if err := tx.QueryRowContext(ctx, fmt.Sprintf(`SELECT COUNT(*) FROM %s WHERE id = ?`, issueTable), issue.ID).Scan(&existingCount); err != nil {
246248
return false, fmt.Errorf("failed to check issue existence for %s: %w", issue.ID, err)
247249
}
250+
if existingCount > 0 {
251+
if mergeByTimestamp {
252+
var localUpdatedAt time.Time
253+
if err := tx.QueryRowContext(ctx, fmt.Sprintf(`SELECT updated_at FROM %s WHERE id = ?`, issueTable), issue.ID).Scan(&localUpdatedAt); err != nil {
254+
return false, fmt.Errorf("failed to read updated_at for %s: %w", issue.ID, err)
255+
}
256+
if !issue.UpdatedAt.After(localUpdatedAt) {
257+
// Local record is same age or newer — skip update.
258+
return false, nil
259+
}
260+
}
261+
// Existing issue, proceed with upsert.
262+
if err := InsertIssueIntoTable(ctx, tx, issueTable, issue); err != nil {
263+
return false, fmt.Errorf("failed to update issue %s: %w", issue.ID, err)
264+
}
265+
return false, nil
266+
}
248267
}
268+
// New issue — insert.
249269
if err := InsertIssueIntoTable(ctx, tx, issueTable, issue); err != nil {
250270
return false, fmt.Errorf("failed to insert issue %s: %w", issue.ID, err)
251271
}
252-
return existingCount == 0, nil
272+
return true, nil
253273
}
254274

255275
// PersistLabels writes issue.Labels into the appropriate labels table.

0 commit comments

Comments
 (0)