Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
b422b80
webdataset schema finished, test cases not working
valerie-cal Mar 10, 2025
8184a1e
Merge branch '2.0' of https://github.com/nsaadhvi/codebase-deltacat i…
valerie-cal Mar 13, 2025
44e32a8
wds schema pytest and wds schema logic
valerie-cal Mar 13, 2025
fe348f8
edits to building wds schema
valerie-cal Mar 15, 2025
f8bf30f
Initial implementation of from_webdataset
valerie-cal Mar 31, 2025
7c5df3e
demo to load web data set
nsaadhvi Apr 3, 2025
7c8f880
fixed error with reading tar files and added sample tar files
valerie-cal Apr 3, 2025
dc348f3
finished pytests
valerie-cal Apr 4, 2025
1ee3ba2
Merge branch 'wds-schema' of https://github.com/nsaadhvi/codebase-del…
nsaadhvi Apr 4, 2025
0111b62
inconsistent jsons pytest added
valerie-cal Apr 4, 2025
124a3f6
comment out failing test
valerie-cal Apr 4, 2025
6ebcf18
restored failing test case
valerie-cal Apr 7, 2025
a843e07
Merge branch 'wds-schema' of https://github.com/nsaadhvi/codebase-del…
nsaadhvi Apr 7, 2025
5a060ac
create datasets in tmp_path
valerie-cal Apr 7, 2025
d8af0b4
added optional user batch_size input
nsaadhvi Apr 24, 2025
79c5cda
Merge branch 'wds-schema' of https://github.com/nsaadhvi/codebase-del…
nsaadhvi Apr 24, 2025
3511e5f
bird classification web data set demo
nsaadhvi Apr 24, 2025
4e642a0
webdataset schema finished, test cases not working
valerie-cal Mar 10, 2025
925fa67
wds schema pytest and wds schema logic
valerie-cal Mar 13, 2025
3c6ced0
edits to building wds schema
valerie-cal Mar 15, 2025
4900233
Initial implementation of from_webdataset
valerie-cal Mar 31, 2025
0e74062
fixed error with reading tar files and added sample tar files
valerie-cal Apr 3, 2025
d383439
finished pytests
valerie-cal Apr 4, 2025
a7e5b32
inconsistent jsons pytest added
valerie-cal Apr 4, 2025
2accfe9
comment out failing test
valerie-cal Apr 4, 2025
4b239e6
restored failing test case
valerie-cal Apr 7, 2025
ccc5440
create datasets in tmp_path
valerie-cal Apr 7, 2025
c328f2b
add image binary column to dataset and schema
valerie-cal May 1, 2025
acbddd4
resolved merge conflicts with 2.0 branch, wds tests passing
nsaadhvi May 1, 2025
1795a19
Throw error for image binary and batch row mismatch
nsaadhvi May 1, 2025
d154e17
edit demo to process images with binary column
nsaadhvi May 2, 2025
5fc1ca3
normalize function and media instead of image
valerie-cal Jun 15, 2025
0a2aaf1
merge
valerie-cal Jun 15, 2025
6bd5f1f
cleanup changes
valerie-cal Jun 15, 2025
5aac9c1
Update wds_demo.py with comments
NeeralBhalgat Jun 16, 2025
ca0f846
Add data read/write tests and remove internal attribute tests
valerie-cal Aug 17, 2025
0e3623d
Merge remote-tracking branch 'origin/2.0' into wds-schema
valerie-cal Aug 17, 2025
860a9d2
Fix accidental executable bit changes
valerie-cal Aug 17, 2025
784b650
Fix accidental executable bit changes
valerie-cal Aug 17, 2025
b8e819d
Manually resolve remaining merge conflicts
valerie-cal Aug 17, 2025
6a5b7df
Addressing comments, to be continued
valerie-cal Aug 18, 2025
ec52d2f
Improvements, to be continued
valerie-cal Aug 18, 2025
b5d638d
Cleaned and upgraded webdataset test suites
025rhu Aug 18, 2025
0f270bd
Removed static tar files used in old WebDataset test suite
025rhu Aug 18, 2025
1be6b29
Removed unnecessary set up code in TestFromWebDataset
025rhu Aug 18, 2025
787cf3f
Using WDS reader for tests, added intra-batch schema merge handling l…
025rhu Aug 21, 2025
c654e68
Make a WebDatasetReader class for Dataset's from_webdataset, and adde…
025rhu Sep 3, 2025
f3401d2
Upgraded inconsistent schema handling test, and added non-lossy promo…
025rhu Sep 3, 2025
90ae6e0
Cleaned code and update a doc string
025rhu Sep 3, 2025
bb86af5
Passing linter
025rhu Sep 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 33 additions & 33 deletions deltacat/examples/experimental/rivulet/pytorch_demo.ipynb
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
{
"cells": [
{
"metadata": {},
"cell_type": "markdown",
"id": "2fb18b4d46a9548",
"metadata": {},
"source": [
"# PyTorch Demo: Sentiment Analysis and Question Detection with Rivulet Dataset\n",
"\n",
Expand All @@ -13,14 +14,16 @@
"- **Pytorch Integration:** Easily allows passing of data between pytorch models and transformers.\n",
"- **Non-Destructive Transformation:** Transforms the data (e.g., adding sentiment and question classification) without modifying the original dataset.\n",
"- **Exporting Data:** Exports the modified dataset to supported formats such as Parquet and JSON for further analysis."
],
"id": "2fb18b4d46a9548"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "initial_id",
"metadata": {
"collapsed": true
},
"cell_type": "code",
"outputs": [],
"source": [
"import torch\n",
"from typing import List\n",
Expand All @@ -29,14 +32,14 @@
"import pathlib\n",
"import pyarrow as pa\n",
"import pyarrow.csv as csv"
],
"id": "initial_id",
"outputs": [],
"execution_count": null
]
},
{
"metadata": {},
"cell_type": "code",
"execution_count": null,
"id": "51a2ddaed83da5f3",
"metadata": {},
"outputs": [],
"source": [
"# Load tokenizer and model for sentiment analysis\n",
"sentiment_tokenizer = AutoTokenizer.from_pretrained(\"distilbert-base-uncased-finetuned-sst-2-english\")\n",
Expand All @@ -46,14 +49,14 @@
"question_tokenizer = AutoTokenizer.from_pretrained(\"shahrukhx01/question-vs-statement-classifier\")\n",
"question_model = AutoModelForSequenceClassification.from_pretrained(\"shahrukhx01/question-vs-statement-classifier\")\n",
"question_model.eval()"
],
"id": "51a2ddaed83da5f3",
"outputs": [],
"execution_count": null
]
},
{
"metadata": {},
"cell_type": "code",
"execution_count": null,
"id": "b74792a57b9b28c1",
"metadata": {},
"outputs": [],
"source": [
"# Create a rivulet dataset using the CSV file\n",
"cwd = pathlib.Path.cwd()\n",
Expand All @@ -65,29 +68,29 @@
" merge_keys=\"msg_id\"\n",
")\n",
"ds.print(num_records=10)"
],
"id": "b74792a57b9b28c1",
"outputs": [],
"execution_count": null
]
},
{
"metadata": {},
"cell_type": "code",
"execution_count": null,
"id": "1b90411fd69378e9",
"metadata": {},
"outputs": [],
"source": [
"# define a new schema with fields for pytorch classification\n",
"ds.add_fields([\n",
" (\"msg_id\", dc.Datatype.int64()),\n",
" (\"sentiment\", dc.Datatype.float()),\n",
" (\"is_question\", dc.Datatype.float())\n",
"], schema_name=\"message_classifier\", merge_keys=[\"msg_id\"])"
],
"id": "1b90411fd69378e9",
"outputs": [],
"execution_count": null
]
},
{
"metadata": {},
"cell_type": "code",
"execution_count": null,
"id": "587f17e09e5d306a",
"metadata": {},
"outputs": [],
"source": [
"# compute classification values and update records in dataset\n",
"def compute_sentiments(batch: pa.RecordBatch) -> List[float]:\n",
Expand Down Expand Up @@ -134,21 +137,18 @@
"\n",
"dataset_writer.flush()\n",
"print(\"Sentiment and is_question values have been computed and updated in the dataset.\")"
],
"id": "587f17e09e5d306a",
"outputs": [],
"execution_count": null
]
},
{
"metadata": {},
"cell_type": "code",
"execution_count": null,
"id": "8ef2dd2a1bc4e66a",
"metadata": {},
"outputs": [],
"source": [
"# export to a supported format (JSON, PARQUET, FEATHER)\n",
"ds.export(file_uri=\"./output.json\", format=\"json\")"
],
"id": "8ef2dd2a1bc4e66a",
"outputs": [],
"execution_count": null
]
}
],
"metadata": {
Expand Down
101 changes: 101 additions & 0 deletions deltacat/examples/experimental/rivulet/wds_demo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import torch
from deltacat.storage.rivulet import Dataset
import pyarrow as pa
from typing import List
from PIL import Image
import io
from deltacat.storage.rivulet.schema.schema import Datatype
from transformers import AutoImageProcessor, AutoModelForImageClassification


# tar_path = "deltacat/tests/test_utils/resources/imagenet1k-train-0000.tar"
tar_path = "deltacat/tests/test_utils/resources/nestedjson.tar"

# Load the dataset from the tar file
ds = Dataset.from_webdataset(
name="bird_species_test", # Name of the dataset
file_uri=tar_path, # Location of the tar file
merge_keys="filename", # Merge batches using the 'filename' key
)

# Print the available fields in the dataset
print(ds.fields)

# Load the image processor and classification model from HuggingFace
processor = AutoImageProcessor.from_pretrained("chriamue/bird-species-classifier")
model = AutoModelForImageClassification.from_pretrained(
"chriamue/bird-species-classifier"
)
model.eval()


# Function to classify bird species from a record batch
def compute_bird_species(batch: pa.RecordBatch) -> List[str]:
# Extract the binary image column
image_column = batch.column("image_binary").to_pylist()

# Initialize list to store PIL Image objects
pil_images = []
for img_binary in image_column:
try:
# Convert binary data to image and convert to RGB
img = Image.open(io.BytesIO(img_binary)).convert("RGB")
pil_images.append(img)
except Exception as e:
# Print error if image decoding fails
print(f"Error reading image: {e}")

# If images were successfully decoded
if pil_images:
# Preprocess images and run them through the model
inputs = processor(images=pil_images, return_tensors="pt")
with torch.no_grad(): # Disable gradient computation
outputs = model(**inputs)

# Get the predicted label indices
predicted_ids = torch.argmax(outputs.logits, dim=1).tolist()

# Map indices to human-readable class labels
predicted_labels = [model.config.id2label[idx] for idx in predicted_ids]

return predicted_labels
else:
# Return empty list if no images were valid
return []


# Add new fields to the dataset: filename and predicted bird species
ds.add_fields(
[
("filename", Datatype.string()), # String type for filename
("bird_species", Datatype.string()), # String type for predicted label
],
schema_name="bird_species_classifier",
merge_keys=["filename"],
) # Schema name and merge key

# Initialize writer to store output under the new schema
dataset_writer = ds.writer(schema_name="bird_species_classifier")

# Iterate over each Arrow batch in the dataset
for batch in ds.scan().to_arrow():
print(batch) # Print the batch contents
filenames = batch.column("filename").to_pylist() # Extract filenames
bird_labels = compute_bird_species(batch) # Run classification on batch

rows_to_write = [] # Prepare rows to be written
if bird_labels:
# Create a list of dictionaries combining filename and predicted species
rows_to_write = [
{"filename": fname, "bird_species": bird_species}
for fname, bird_species in zip(filenames, bird_labels)
]
print("ROWS", rows_to_write) # Print rows to be written
dataset_writer.write(rows_to_write) # Write the output to dataset

dataset_writer.flush()

# Export the results to a local JSON file
ds.export(file_uri="./bird_classification_species_predictions.json", format="json")

print("Bird species classification complete.")
78 changes: 75 additions & 3 deletions deltacat/experimental/storage/rivulet/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@
from deltacat.experimental.storage.rivulet.reader.query_expression import (
QueryExpression,
)

from deltacat.experimental.storage.rivulet.reader.webdataset_reader import (
WebDatasetReader,
)
from deltacat.experimental.storage.rivulet.writer.dataset_writer import DatasetWriter
from deltacat.experimental.storage.rivulet.writer.memtable_dataset_writer import (
MemtableDatasetWriter,
Expand Down Expand Up @@ -479,6 +481,77 @@ def from_json(

return dataset

@classmethod
def from_webdataset(
cls,
name: str,
file_uri: str,
merge_keys: str | Iterable[str] = None,
metadata_uri: Optional[str] = None,
schema_mode: str = "union",
batch_size: Optional[int] = 1,
filesystem: Optional[pyarrow.fs.FileSystem] = None,
namespace: str = DEFAULT_NAMESPACE,
) -> "Dataset":
"""
Create a Dataset from a single webdataset tar file.

TODO: Add support for reading directories with multiple WDS files.

Args:
name: Unique identifier for the dataset.
metadata_uri: Base URI for the dataset, where dataset metadata is stored. If not specified, will be placed in ${file_uri}/riv-meta
file_uri: Path to a single webdataset file.
merge_keys: Fields to specify as merge keys for future 'zipper merge' operations on the dataset.
schema_mode: Currently ignored as this is for a single file.

Returns:
Dataset: New dataset instance with the schema automatically inferred
from the tar file.
"""
# TODO: integrate this with filesystem from deltacat catalog
file_uri, file_fs = FileStore.filesystem(file_uri, filesystem=filesystem)
if metadata_uri is None:
metadata_uri = posixpath.join(posixpath.dirname(file_uri), "riv-meta")
else:
metadata_uri, metadata_fs = FileStore.filesystem(
metadata_uri, filesystem=filesystem
)

# TODO: when integrating deltacat consider if we can support multiple filesystems
if file_fs.type_name != metadata_fs.type_name:
raise ValueError(
"File URI and metadata URI must be on the same filesystem."
)

# Read the WebDataset into a PyArrow Table
wds_parser = WebDatasetReader(
name=name,
file_uri=file_uri,
merge_keys=merge_keys,
schema_mode=schema_mode,
batch_size=batch_size,
namespace=namespace,
)
pyarrow_table = wds_parser.to_pyarrow()
# Create the Dataset and write to it
dataset_schema = Schema.from_pyarrow(
pyarrow_table.schema, merge_keys=merge_keys
)

dataset = cls(
dataset_name=name,
metadata_uri=metadata_uri,
schema=dataset_schema,
filesystem=file_fs,
namespace=namespace,
)

writer = dataset.writer()
writer.write(pyarrow_table.to_batches())
writer.flush()
return dataset

@classmethod
def from_csv(
cls,
Expand Down Expand Up @@ -522,7 +595,7 @@ def from_csv(
)

# Read the CSV file into a PyArrow Table
table = pyarrow.csv.read_csv(file_uri, filesystem=file_fs)
table = pyarrow.csv.read_csv(file_uri)
pyarrow_schema = table.schema

# Create the dataset schema
Expand Down Expand Up @@ -718,7 +791,6 @@ def writer(
:return: new dataset writer with a schema at the conjunction of the given schemas
"""
schema_name = schema_name or ALL

return MemtableDatasetWriter(
self._file_provider, self.schemas[schema_name], self._locator, file_format
)
Expand Down
Loading
Loading