@@ -72,37 +72,40 @@ func (jd *HandleT) backupDSLoop(ctx context.Context) {
72
72
opPayload , err := json .Marshal (& backupDS )
73
73
jd .assertError (err )
74
74
75
- opID , err := jd .JournalMarkStart (backupDSOperation , opPayload )
76
- if err != nil {
77
- return fmt .Errorf ("mark start of backup operation: %w" , err )
78
- }
79
- err = jd .backupDS (ctx , backupDSRange )
80
- if err != nil {
81
- return fmt .Errorf ("backup dataset: %w" , err )
82
- }
83
- err = jd .JournalMarkDone (opID )
84
- if err != nil {
85
- return fmt .Errorf ("mark end of backup operation: %w" , err )
75
+ if err := jd .WithTx (func (tx * Tx ) error {
76
+ opID , err := jd .JournalMarkStartInTx (tx , backupDSOperation , opPayload )
77
+ if err != nil {
78
+ return fmt .Errorf ("mark start of backup operation: %w" , err )
79
+ }
80
+ if err := jd .backupDS (ctx , backupDSRange ); err != nil {
81
+ return fmt .Errorf ("backup dataset: %w" , err )
82
+ }
83
+ if err := jd .journalMarkDoneInTx (tx , opID ); err != nil {
84
+ return fmt .Errorf ("mark end of backup operation: %w" , err )
85
+ }
86
+ return nil
87
+ }); err != nil {
88
+ return err
86
89
}
87
90
88
- // drop dataset after successfully uploading both jobs and jobs_status to s3
89
- opID , err = jd . JournalMarkStart ( backupDropDSOperation , opPayload )
90
- if err != nil {
91
- return fmt . Errorf ( "mark start of drop backup operation: %w" , err )
92
- }
93
- // Currently, we retry uploading a table for some time & if it fails. We only drop that table & not all `pre_drop` tables.
94
- // So, in situation when new table creation rate is more than drop . We will still have pipe up issue .
95
- // An easy way to fix this is, if at any point of time exponential retry fails then instead of just dropping that particular
96
- // table drop all subsequent `pre_drop` table. As, most likely the upload of rest of the table will also fail with the same error.
97
- err = jd . dropDS ( backupDS )
98
- if err != nil {
99
- return fmt .Errorf (" drop dataset: %w" , err )
100
- }
101
- err = jd .JournalMarkDone ( opID )
102
- if err != nil {
103
- return fmt . Errorf ( "mark end of drop backup operation: %w" , err )
104
- }
105
- return nil
91
+ return jd . WithTx ( func ( tx * Tx ) error {
92
+ // drop dataset after successfully uploading both jobs and jobs_status to s3
93
+ opID , err := jd . JournalMarkStartInTx ( tx , backupDropDSOperation , opPayload )
94
+ if err != nil {
95
+ return fmt . Errorf ( "mark start of drop backup operation: %w" , err )
96
+ }
97
+ // Currently, we retry uploading a table for some time & if it fails . We only drop that table & not all `pre_drop` tables .
98
+ // So, in situation when new table creation rate is more than drop. We will still have pipe up issue.
99
+ // An easy way to fix this is, if at any point of time exponential retry fails then instead of just dropping that particular
100
+ // table drop all subsequent `pre_drop` table. As, most likely the upload of rest of the table will also fail with the same error.
101
+ if err := jd . dropDSInTx ( tx , backupDS ); err != nil {
102
+ return fmt .Errorf (" drop dataset: %w" , err )
103
+ }
104
+ if err : = jd .journalMarkDoneInTx ( tx , opID ); err != nil {
105
+ return fmt . Errorf ( "mark end of drop backup operation: %w" , err )
106
+ }
107
+ return nil
108
+ })
106
109
}
107
110
if err := loop (); err != nil && ctx .Err () == nil {
108
111
if ! jd .skipMaintenanceError {
0 commit comments