Skip to content

Commit d354a6b

Browse files
svsgooglecopybara-github
authored andcommitted
Add revision_history_index to PersistedIncrementalDataSliceManager.from_persistence_dir.
This new parameter allows initializing a `PersistedIncrementalDataSliceManager` from a specific revision in the persistence directory's history, rather than always from the latest state. PiperOrigin-RevId: 861670440 Change-Id: I8f951c06686c9ad6596fdd6fb22c0623673b6db0
1 parent 6e84e8b commit d354a6b

File tree

2 files changed

+178
-31
lines changed

2 files changed

+178
-31
lines changed

py/koladata/ext/persisted_data/persisted_incremental_data_slice_manager.py

Lines changed: 70 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -291,6 +291,7 @@ def create_from_dir(
291291
cls,
292292
persistence_dir: str,
293293
*,
294+
at_revision_history_index: int | None = None,
294295
fs: fs_interface.FileSystemInterface | None = None,
295296
) -> PersistedIncrementalDataSliceManager:
296297
"""Initializes a manager from an existing persistence directory.
@@ -301,6 +302,15 @@ def create_from_dir(
301302
manager should be initialized. Updates to the data and metadata will be
302303
persisted to this directory; call returned_manager.branch(...) if you
303304
want to persist updates to a different directory.
305+
at_revision_history_index: The index of the revision in the revision
306+
history that should be used as the basis for the resulting manager. The
307+
initial state of the manager's DataSlice will be the same as it was
308+
right after the revision at the given index was created. The value of
309+
at_revision_history_index must be a valid non-negative index number. By
310+
default, the manager is created on top of the latest revision found in
311+
the persistence directory. If you have a directory and want to see which
312+
revisions are available, you can simply call
313+
create_from_dir(persistence_dir).get_revision_history().
304314
fs: All interactions with the file system will go through this instance.
305315
If None, then the default interaction with the file system is used.
306316
"""
@@ -312,7 +322,7 @@ def create_from_dir(
312322
' persisted previously by a PersistedIncrementalDataSliceManager'
313323
)
314324

315-
metadata = _read_latest_metadata(fs, persistence_dir)
325+
metadata = _read_metadata(fs, persistence_dir, at_revision_history_index)
316326
initial_data_manager_class = (
317327
initial_data_manager_registry.get_initial_data_manager_class(
318328
metadata.initial_data_manager_id
@@ -1073,8 +1083,8 @@ def branch(
10731083
< len(self._metadata.revision_history)
10741084
):
10751085
raise ValueError(
1076-
f'revision_history_index {revision_history_index} is out of bounds.'
1077-
' Valid values are in the range'
1086+
f'revision_history_index value {revision_history_index} is out of'
1087+
' bounds. Valid values are in the range'
10781088
f' [{-len(self._metadata.revision_history)},'
10791089
f' {len(self._metadata.revision_history)})'
10801090
)
@@ -1280,10 +1290,6 @@ def _get_initial_schema_node_name_to_bag_names_filepath(
12801290
)
12811291

12821292

1283-
def _get_metadata_filepath(persistence_dir: str) -> str:
1284-
return os.path.join(persistence_dir, 'metadata.pb')
1285-
1286-
12871293
def _get_uuid() -> str:
12881294
return str(uuid.uuid1())
12891295

@@ -1292,6 +1298,31 @@ def _get_uuid() -> str:
12921298
_UPDATE_NUMBER_NUM_DIGITS = 12
12931299

12941300

1301+
def _format_update_number(update_number: int) -> str:
1302+
return str(update_number).zfill(_UPDATE_NUMBER_NUM_DIGITS)
1303+
1304+
1305+
def _get_metadata_filepath(persistence_dir: str, at_version: int) -> str:
1306+
return os.path.join(
1307+
persistence_dir, f'metadata-{_format_update_number(at_version)}.pb'
1308+
)
1309+
1310+
1311+
def _get_latest_metadata_version(
1312+
fs: fs_interface.FileSystemInterface,
1313+
persistence_dir: str,
1314+
) -> int:
1315+
update_number_pattern = '[0-9]' * _UPDATE_NUMBER_NUM_DIGITS
1316+
committed_metadata_filepaths = fs.glob(
1317+
os.path.join(persistence_dir, f'metadata-{update_number_pattern}.pb')
1318+
)
1319+
latest_metadata_filepath = max(committed_metadata_filepaths)
1320+
latest_metadata_filename = os.path.basename(latest_metadata_filepath)
1321+
return int(
1322+
latest_metadata_filename.removeprefix('metadata-').removesuffix('.pb')
1323+
)
1324+
1325+
12951326
def _persist_metadata(
12961327
fs: fs_interface.FileSystemInterface,
12971328
persistence_dir: str,
@@ -1310,10 +1341,9 @@ def _persist_metadata(
13101341
persistence_dir: The persistence directory of the manager.
13111342
metadata: The metadata to persist.
13121343
"""
1313-
update_number = str(metadata.metadata_update_number).zfill(
1314-
_UPDATE_NUMBER_NUM_DIGITS
1344+
final_filepath = _get_metadata_filepath(
1345+
persistence_dir, metadata.metadata_update_number
13151346
)
1316-
final_filepath = os.path.join(persistence_dir, f'metadata-{update_number}.pb')
13171347
if fs.exists(final_filepath):
13181348
# Fail fast if the metadata already exists. No point in writing a temporary
13191349
# file that will end up not being used because of this error.
@@ -1329,23 +1359,46 @@ def _persist_metadata(
13291359
)
13301360
temp_filepath = os.path.join(
13311361
persistence_dir,
1332-
f'metadata-{update_number}-{_get_uuid()}.pb',
1362+
f'metadata-{_format_update_number(metadata.metadata_update_number)}-{_get_uuid()}.pb',
13331363
)
13341364
with fs.open(temp_filepath, 'wb') as f:
13351365
f.write(metadata.SerializeToString())
13361366
fs.rename(temp_filepath, final_filepath, overwrite=False)
13371367

13381368

1339-
def _read_latest_metadata(
1369+
def _read_metadata(
13401370
fs: fs_interface.FileSystemInterface,
13411371
persistence_dir: str,
1372+
at_revision_history_index: int | None,
13421373
) -> metadata_pb2.PersistedIncrementalDataSliceManagerMetadata:
1343-
update_number_pattern = '[0-9]' * _UPDATE_NUMBER_NUM_DIGITS
1344-
committed_metadata_filepaths = fs.glob(
1345-
os.path.join(persistence_dir, f'metadata-{update_number_pattern}.pb')
1374+
"""Reads the metadata from disk.
1375+
1376+
If at_revision_history_index is None, the latest metadata is read.
1377+
1378+
Args:
1379+
fs: The operations to use to interact with the file system.
1380+
persistence_dir: The persistence directory of the manager.
1381+
at_revision_history_index: The version of the metadata to read. If None, the
1382+
latest metadata is read.
1383+
1384+
Returns:
1385+
The metadata read from disk.
1386+
"""
1387+
latest_metadata_version = _get_latest_metadata_version(fs, persistence_dir)
1388+
if at_revision_history_index is None:
1389+
at_revision_history_index = latest_metadata_version
1390+
else:
1391+
if not (0 <= at_revision_history_index <= latest_metadata_version):
1392+
raise ValueError(
1393+
f'at_revision_history_index value {at_revision_history_index} is out'
1394+
' of bounds. Valid values are in the range'
1395+
f' [0, {latest_metadata_version}]'
1396+
)
1397+
1398+
metadata_filepath = _get_metadata_filepath(
1399+
persistence_dir, at_revision_history_index
13461400
)
1347-
latest_metadata_filepath = max(committed_metadata_filepaths)
1348-
with fs.open(latest_metadata_filepath, 'rb') as f:
1401+
with fs.open(metadata_filepath, 'rb') as f:
13491402
return metadata_pb2.PersistedIncrementalDataSliceManagerMetadata.FromString(
13501403
f.read()
13511404
)

py/koladata/ext/persisted_data/persisted_incremental_data_slice_manager_test.py

Lines changed: 108 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3824,8 +3824,10 @@ def test_revision_history_is_tracked_in_metadata(self, _):
38243824
)
38253825
# Metadata is written to disk:
38263826
self.assertEqual(
3827-
persisted_incremental_data_slice_manager._read_latest_metadata(
3828-
fs_implementation.FileSystemInteraction(), persistence_dir
3827+
persisted_incremental_data_slice_manager._read_metadata(
3828+
fs_implementation.FileSystemInteraction(),
3829+
persistence_dir,
3830+
at_revision_history_index=None,
38293831
),
38303832
manager._metadata,
38313833
)
@@ -3873,8 +3875,10 @@ def test_revision_history_is_tracked_in_metadata(self, _):
38733875
)
38743876
# Metadata is written to disk:
38753877
self.assertEqual(
3876-
persisted_incremental_data_slice_manager._read_latest_metadata(
3877-
fs_implementation.FileSystemInteraction(), persistence_dir
3878+
persisted_incremental_data_slice_manager._read_metadata(
3879+
fs_implementation.FileSystemInteraction(),
3880+
persistence_dir,
3881+
at_revision_history_index=None,
38783882
),
38793883
manager._metadata,
38803884
)
@@ -3915,8 +3919,10 @@ def test_revision_history_is_tracked_in_metadata(self, _):
39153919
)
39163920
# Metadata is written to disk:
39173921
self.assertEqual(
3918-
persisted_incremental_data_slice_manager._read_latest_metadata(
3919-
fs_implementation.FileSystemInteraction(), branch_dir
3922+
persisted_incremental_data_slice_manager._read_metadata(
3923+
fs_implementation.FileSystemInteraction(),
3924+
branch_dir,
3925+
at_revision_history_index=None,
39203926
),
39213927
branch_manager._metadata,
39223928
)
@@ -3967,8 +3973,10 @@ def test_revision_history_is_tracked_in_metadata(self, _):
39673973
)
39683974
# Metadata is written to disk:
39693975
self.assertEqual(
3970-
persisted_incremental_data_slice_manager._read_latest_metadata(
3971-
fs_implementation.FileSystemInteraction(), branch_dir
3976+
persisted_incremental_data_slice_manager._read_metadata(
3977+
fs_implementation.FileSystemInteraction(),
3978+
branch_dir,
3979+
at_revision_history_index=None,
39723980
),
39733981
branch_manager._metadata,
39743982
)
@@ -3987,8 +3995,10 @@ def test_revision_history_is_tracked_in_metadata(self, _):
39873995
self.assertEqual(manager._metadata.revision_history[0], revision_0)
39883996
self.assertEqual(manager._metadata.revision_history[1], revision_1)
39893997
self.assertEqual(
3990-
persisted_incremental_data_slice_manager._read_latest_metadata(
3991-
fs_implementation.FileSystemInteraction(), persistence_dir
3998+
persisted_incremental_data_slice_manager._read_metadata(
3999+
fs_implementation.FileSystemInteraction(),
4000+
persistence_dir,
4001+
at_revision_history_index=None,
39924002
),
39934003
manager._metadata,
39944004
)
@@ -4230,8 +4240,8 @@ def test_branch_with_invalid_and_valid_revision_history_index(self):
42304240
with self.assertRaisesRegex(
42314241
ValueError,
42324242
re.escape(
4233-
f'revision_history_index {too_small_index} is out of bounds.'
4234-
' Valid values are in the range [-3, 3)'
4243+
f'revision_history_index value {too_small_index} is out of'
4244+
' bounds. Valid values are in the range [-3, 3)'
42354245
),
42364246
):
42374247
trunk_manager.branch(branch_dir, revision_history_index=too_small_index)
@@ -4241,8 +4251,8 @@ def test_branch_with_invalid_and_valid_revision_history_index(self):
42414251
with self.assertRaisesRegex(
42424252
ValueError,
42434253
re.escape(
4244-
f'revision_history_index {too_large_index} is out of bounds.'
4245-
' Valid values are in the range [-3, 3)'
4254+
f'revision_history_index value {too_large_index} is out of'
4255+
' bounds. Valid values are in the range [-3, 3)'
42464256
),
42474257
):
42484258
trunk_manager.branch(branch_dir, revision_history_index=too_large_index)
@@ -4671,6 +4681,90 @@ def test_get_schema_at(self):
46714681
ids_equality=True,
46724682
)
46734683

4684+
def test_create_from_dir_at_revision_history_index(self):
4685+
persistence_dir = self.create_tempdir().full_path
4686+
manager = PersistedIncrementalDataSliceManager.create_new(persistence_dir)
4687+
ds_at_revision_0 = manager.get_data_slice(
4688+
populate_including_descendants={parse_dsp('')}
4689+
)
4690+
4691+
manager.update(
4692+
at_path=parse_dsp(''),
4693+
attr_name='query',
4694+
attr_value=kd.list([kd.new(query_id='q1')]),
4695+
description='Added queries with only query_id populated',
4696+
)
4697+
ds_at_revision_1 = manager.get_data_slice(
4698+
populate_including_descendants={parse_dsp('')},
4699+
)
4700+
4701+
manager.update(
4702+
at_path=parse_dsp('.query[:]'),
4703+
attr_name='doc',
4704+
attr_value=kd.new(doc_id=kd.slice([0, 1, 2, 3])).implode(),
4705+
description='Added docs to queries',
4706+
)
4707+
ds_at_revision_2 = manager.get_data_slice(
4708+
populate_including_descendants={parse_dsp('')},
4709+
)
4710+
4711+
new_manager = PersistedIncrementalDataSliceManager.create_from_dir(
4712+
persistence_dir, at_revision_history_index=0
4713+
)
4714+
kd.testing.assert_equivalent(
4715+
new_manager.get_data_slice(
4716+
populate_including_descendants={parse_dsp('')}
4717+
),
4718+
ds_at_revision_0,
4719+
ids_equality=True,
4720+
)
4721+
4722+
new_manager = PersistedIncrementalDataSliceManager.create_from_dir(
4723+
persistence_dir, at_revision_history_index=1
4724+
)
4725+
kd.testing.assert_equivalent(
4726+
new_manager.get_data_slice(
4727+
populate_including_descendants={parse_dsp('')}
4728+
),
4729+
ds_at_revision_1,
4730+
ids_equality=True,
4731+
)
4732+
4733+
new_manager = PersistedIncrementalDataSliceManager.create_from_dir(
4734+
persistence_dir, at_revision_history_index=2
4735+
)
4736+
kd.testing.assert_equivalent(
4737+
new_manager.get_data_slice(
4738+
populate_including_descendants={parse_dsp('')}
4739+
),
4740+
ds_at_revision_2,
4741+
ids_equality=True,
4742+
)
4743+
4744+
with self.subTest('negative_value'):
4745+
with self.assertRaisesRegex(
4746+
ValueError,
4747+
re.escape(
4748+
'at_revision_history_index value -1 is out of bounds. Valid'
4749+
' values are in the range [0, 2]'
4750+
),
4751+
):
4752+
PersistedIncrementalDataSliceManager.create_from_dir(
4753+
persistence_dir, at_revision_history_index=-1
4754+
)
4755+
4756+
with self.subTest('too_large_value'):
4757+
with self.assertRaisesRegex(
4758+
ValueError,
4759+
re.escape(
4760+
'at_revision_history_index value 3 is out of bounds. Valid values'
4761+
' are in the range [0, 2]'
4762+
),
4763+
):
4764+
PersistedIncrementalDataSliceManager.create_from_dir(
4765+
persistence_dir, at_revision_history_index=3
4766+
)
4767+
46744768

46754769
if __name__ == '__main__':
46764770
absltest.main()

0 commit comments

Comments
 (0)