Implement Arrow cache reader/writer#856
Conversation
| batch : pa.RecordBatch | dict[str, list | np.ndarray | pa.Array] | ||
| A batch of columnar data. | ||
| """ | ||
| if isinstance(batch, dict): |
There was a problem hiding this comment.
same question as above, do we need to be able to deal with multiple types here or could we convert them before we get to this point?
There was a problem hiding this comment.
I've added write_columns to deal with the conversion. We now have write_rows and write_columns that perform a conversion and then call write_batch.
|
|
||
| def __exit__(self, exc_type, exc_val, exc_tb): | ||
| """Context manager exit - ensures data is flushed.""" | ||
| self.flush() |
There was a problem hiding this comment.
If there's a problem with flush or indeed any call used during the time this object's _writer is open, it won't get closed properly. Not sure that's a big deal unless we want to try to handle errors that might happen, which sounds more difficult.
There was a problem hiding this comment.
Yea, im not sure this is worth diving into. The most likely cause of an error is malformed annotations, in which case we already cannot use the cache.
| self.flush() | ||
| return MemoryCacheReader( | ||
| table=self._table, batch_size=self._batch_size | ||
| ) |
There was a problem hiding this comment.
Does it make sense for the writer to continue to write after creating a reader like this? Should the writer's self._table be set to None or something?
There was a problem hiding this comment.
PyArrow tables are immutatable so the memory reader is safe from the table being mutated, this is not the case for the file-based cache though.
If we want to signal to the user / developer that a reader has been created and writing has ceased then we could do that set to None operation. We would also need to maintain this flow on the file-based side where we would either need to change file permissions or leave a flag within the cache .cfg file saying that writing is no longer allowed.
| cfg_path.unlink() | ||
|
|
||
| # delete empty cache directory | ||
| path.rmdir() |
There was a problem hiding this comment.
All this is okay I think, but if the config file is bad for some reason FileCacheReader.load will fail, and if there are any extra files under path, path.rmdir will fail. Maybe that's what we want, i.e. if something unexpected happens maybe we shouldn't delete the thing in the first place?
There was a problem hiding this comment.
My intent was to fail if the file footprint did not match exactly what was created by the Writer 😁
There was a problem hiding this comment.
yeah I think that sounds good
|
|
||
| class EmptyCacheError(Exception): | ||
| def __init__(self): | ||
| super().__init__("cache contains no data") |
There was a problem hiding this comment.
This does not seem to be used anywhere?
There was a problem hiding this comment.
Not yet, at least. I copied in the common code for all task types
Broke out cache files from #855 #854 #853