Skip to content

Commit a443f72

Browse files
authored
Commit learns how to rebase in case of conflict (#920)
`Session.commit` accepts new arguments: ``` rebase_with: ConflictSolver | None = None, rebase_tries: int = 5 ``` It uses the `ConflictSolver` to call `Session.rebase` in a loop, up to `rebase_tries` times, and try to commit after each rebase.
1 parent 3e44f49 commit a443f72

File tree

7 files changed

+211
-49
lines changed

7 files changed

+211
-49
lines changed

docs/docs/icechunk-python/version-control.md

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,6 @@ print(session.commit(message="Update foo attribute on root group"))
7272

7373
With a few snapshots committed, we can take a look at the ancestry of the `main` branch:
7474

75-
7675
```python exec="on" session="version" source="material-block" result="code"
7776
for snapshot in repo.ancestry(branch="main"):
7877
print(snapshot)
@@ -89,7 +88,6 @@ gitGraph
8988
""".format(*[snap.id[:6] for snap in repo.ancestry(branch="main")]))
9089
```
9190

92-
9391
## Time Travel
9492

9593
Now that we've created a new snapshot, we can time-travel back to the previous snapshot using the snapshot ID.
@@ -193,7 +191,6 @@ For example to tag the second commit in `main`'s history:
193191
repo.create_tag("v1.0.0", snapshot_id=list(repo.ancestry(branch="main"))[1].id)
194192
```
195193

196-
197194
Because tags are immutable, we need to use a readonly `Session` to access the data referenced by a tag.
198195

199196
```python exec="on" session="version" source="material-block" result="code"
@@ -353,6 +350,7 @@ session = repo.readonly_session(branch="main")
353350
```
354351

355352
Lastly, if you make changes to non-conflicting chunks or attributes, you can rebase without having to resolve any conflicts.
353+
This time we will show how to use rebase automatically during the `commit` call:
356354

357355
```python
358356
session1 = repo.writable_session("main")
@@ -365,16 +363,9 @@ root1["data"][3,:] = 3
365363
root2["data"][4,:] = 4
366364

367365
session1.commit(message="Update fourth row of data array")
366+
session2.commit(message="Update fifth row of data array", rebase_with=icechunk.ConflictDetector())
367+
print("Rebase+commit succeeded")
368368

369-
try:
370-
session2.rebase(icechunk.ConflictDetector())
371-
print("Rebase succeeded")
372-
except icechunk.RebaseFailedError as e:
373-
print(e.conflicts)
374-
375-
session2.commit(message="Update fifth row of data array")
376-
377-
# Rebase succeeded
378369
```
379370

380371
And now we can see the data in the `data` array to confirm that the changes were committed correctly.

icechunk-python/examples/bank_accounts.py

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -138,15 +138,6 @@ def slow_transfer_task(repo: icechunk.Repository) -> WireResult:
138138
return res
139139

140140

141-
def rebase_loop(session: icechunk.Session, message: str) -> None:
142-
while True:
143-
try:
144-
session.commit(message)
145-
return
146-
except icechunk.ConflictError:
147-
session.rebase(icechunk.ConflictDetector())
148-
149-
150141
def rebase_transfer_task(repo: icechunk.Repository) -> WireResult:
151142
"""Safe and fast approach to concurrent transfers.
152143
@@ -169,7 +160,10 @@ def rebase_transfer_task(repo: icechunk.Repository) -> WireResult:
169160
return res
170161
if res == WireResult.DONE:
171162
try:
172-
rebase_loop(session, f"wired ${amount}: {from_account} -> {to_account}")
163+
session.commit(
164+
f"wired ${amount}: {from_account} -> {to_account}",
165+
rebase_with=icechunk.ConflictDetector(),
166+
)
173167
return WireResult.DONE
174168
except icechunk.RebaseFailedError:
175169
pass

icechunk-python/python/icechunk/_icechunk_python.pyi

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1066,7 +1066,13 @@ class PySession:
10661066
@property
10671067
def store(self) -> PyStore: ...
10681068
def merge(self, other: PySession) -> None: ...
1069-
def commit(self, message: str, metadata: dict[str, Any] | None = None) -> str: ...
1069+
def commit(
1070+
self,
1071+
message: str,
1072+
metadata: dict[str, Any] | None = None,
1073+
rebase_with: ConflictSolver | None = None,
1074+
rebase_tries: int = 1_000,
1075+
) -> str: ...
10701076
def rebase(self, solver: ConflictSolver) -> None: ...
10711077

10721078
class PyStore:

icechunk-python/python/icechunk/session.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,13 @@ def merge(self, other: Self) -> None:
250250
"""
251251
self._session.merge(other._session)
252252

253-
def commit(self, message: str, metadata: dict[str, Any] | None = None) -> str:
253+
def commit(
254+
self,
255+
message: str,
256+
metadata: dict[str, Any] | None = None,
257+
rebase_with: ConflictSolver | None = None,
258+
rebase_tries: int = 1_000,
259+
) -> str:
254260
"""
255261
Commit the changes in the session to the repository.
256262
@@ -264,6 +270,10 @@ def commit(self, message: str, metadata: dict[str, Any] | None = None) -> str:
264270
The message to write with the commit.
265271
metadata : dict[str, Any] | None, optional
266272
Additional metadata to store with the commit snapshot.
273+
rebase_with : ConflictSolver | None, optional
274+
If other session committed while the current session was writing, use Session.rebase with this solver.
275+
rebase_tries : int, optional
276+
If other session committed while the current session was writing, use Session.rebase up to this many times in a loop.
267277
268278
Returns
269279
-------
@@ -276,9 +286,13 @@ def commit(self, message: str, metadata: dict[str, Any] | None = None) -> str:
276286
If the session is out of date and a conflict occurs.
277287
"""
278288
try:
279-
return self._session.commit(message, metadata)
289+
return self._session.commit(
290+
message, metadata, rebase_with=rebase_with, rebase_tries=rebase_tries
291+
)
280292
except PyConflictError as e:
281293
raise ConflictError(e) from None
294+
except PyRebaseFailedError as e:
295+
raise RebaseFailedError(e) from None
282296

283297
def rebase(self, solver: ConflictSolver) -> None:
284298
"""

icechunk-python/src/session.rs

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -180,23 +180,35 @@ impl PySession {
180180
})
181181
}
182182

183-
#[pyo3(signature = (message, metadata=None))]
183+
#[pyo3(signature = (message, metadata=None, rebase_with=None, rebase_tries=1_000))]
184184
pub fn commit(
185185
&self,
186186
py: Python<'_>,
187187
message: &str,
188188
metadata: Option<PySnapshotProperties>,
189+
rebase_with: Option<PyConflictSolver>,
190+
rebase_tries: Option<u16>,
189191
) -> PyResult<String> {
192+
let metadata = metadata.map(|m| m.into());
190193
// This is blocking function, we need to release the Gil
191194
py.allow_threads(move || {
192195
pyo3_async_runtimes::tokio::get_runtime().block_on(async {
193-
let snapshot_id = self
194-
.0
195-
.write()
196-
.await
197-
.commit(message, metadata.map(|m| m.into()))
198-
.await
199-
.map_err(PyIcechunkStoreError::SessionError)?;
196+
let mut session = self.0.write().await;
197+
let snapshot_id = if let Some(solver) = rebase_with {
198+
session
199+
.commit_rebasing(
200+
solver.as_ref(),
201+
rebase_tries.unwrap_or(1_000),
202+
message,
203+
metadata,
204+
|_| async {},
205+
|_| async {},
206+
)
207+
.await
208+
} else {
209+
session.commit(message, metadata).await
210+
}
211+
.map_err(PyIcechunkStoreError::SessionError)?;
200212
Ok(snapshot_id.to_string())
201213
})
202214
})

icechunk-python/tests/test_conflicts.py

Lines changed: 8 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,7 @@ def test_rebase_no_conflicts(repo: icechunk.Repository) -> None:
8383
array_b = cast(zarr.Array, root_b["foo/bar/some-array"])
8484
array_b.attrs["repo"] = 2
8585

86-
session_b.rebase(icechunk.ConflictDetector())
87-
session_b.commit("update array")
86+
session_b.commit("update array", rebase_with=icechunk.ConflictDetector())
8887

8988
session_c = repo.readonly_session(branch="main")
9089
store_c = session_c.store
@@ -94,13 +93,7 @@ def test_rebase_no_conflicts(repo: icechunk.Repository) -> None:
9493
assert array_c.attrs["repo"] == 2
9594

9695

97-
@pytest.mark.parametrize(
98-
"on_chunk_conflict",
99-
[icechunk.VersionSelection.UseOurs, icechunk.VersionSelection.UseTheirs],
100-
)
101-
def test_rebase_fails_on_user_atts_double_edit(
102-
repo: icechunk.Repository, on_chunk_conflict: icechunk.VersionSelection
103-
) -> None:
96+
def test_rebase_fails_on_user_atts_double_edit(repo: icechunk.Repository) -> None:
10497
session_a = repo.writable_session("main")
10598
session_b = repo.writable_session("main")
10699
store_a = session_a.store
@@ -120,7 +113,7 @@ def test_rebase_fails_on_user_atts_double_edit(
120113

121114
# Make sure it fails if the resolver is not set
122115
with pytest.raises(icechunk.RebaseFailedError):
123-
session_b.rebase(icechunk.BasicConflictSolver())
116+
session_b.commit("update array", rebase_with=icechunk.BasicConflictSolver())
124117

125118

126119
@pytest.mark.parametrize(
@@ -150,10 +143,11 @@ def test_rebase_chunks_with_ours(
150143
# Make sure it fails if the resolver is not set
151144
with pytest.raises(icechunk.RebaseFailedError):
152145
try:
153-
session_b.rebase(
154-
icechunk.BasicConflictSolver(
146+
session_b.commit(
147+
"update first column of array",
148+
rebase_with=icechunk.BasicConflictSolver(
155149
on_chunk_conflict=icechunk.VersionSelection.Fail
156-
)
150+
),
157151
)
158152
except icechunk.RebaseFailedError as e:
159153
assert e.conflicts[0].path == "/foo/bar/some-array"
@@ -184,8 +178,7 @@ def test_rebase_chunks_with_ours(
184178
on_chunk_conflict=on_chunk_conflict,
185179
)
186180

187-
session_b.rebase(solver)
188-
session_b.commit("after conflict")
181+
session_b.commit("after conflict", rebase_with=solver)
189182

190183
session_c = repo.readonly_session(branch="main")
191184
store_c = session_c.store

0 commit comments

Comments
 (0)