Skip to content

Integrate Data API into execution pipeline#66

Merged
JyotinderSingh merged 5 commits intomainfrom
data-api-2
Mar 6, 2026
Merged

Integrate Data API into execution pipeline#66
JyotinderSingh merged 5 commits intomainfrom
data-api-2

Conversation

@JyotinderSingh
Copy link
Collaborator

@JyotinderSingh JyotinderSingh commented Mar 4, 2026

Integrate Data API into execution pipeline

Depends on #65

Summary

  • Integrates the Data class into the full execution pipeline: from the @run() decorator through artifact preparation, serialization, and pod-side resolution
  • Adds a volumes parameter to @keras_remote.run() for mounting data at fixed paths on the remote pod
  • Supports passing Data objects as function arguments (including nested in lists/tuples/dicts), which are transparently resolved to filesystem paths on the pod
  • Excludes local Data paths from the context zip to avoid uploading large datasets twice
  • Adds pod-side data download and resolution in the remote runner

New volumes parameter on @keras_remote.run()

The run() decorator now accepts a volumes dict mapping absolute mount paths to Data objects:

@keras_remote.run(
    accelerator="v4-8",
    volumes={
        "/data": Data("./dataset/"),
        "/config": Data("gs://my-bucket/configs/"),
    },
)
def train():
    # /data and /config are populated before this function runs
    files = os.listdir("/data")
    ...

Validation:

  • volumes must be a dict (raises TypeError otherwise)
  • Keys must be strings starting with / — absolute mount paths (raises ValueError otherwise)
  • Values must be Data instances (raises TypeError otherwise)

Two ways to pass data to remote functions

Method 1: Data as function arguments

Data objects passed as positional or keyword arguments are transparently replaced with local filesystem paths on the remote pod. The user's function receives plain strings:

@keras_remote.run(accelerator="v4-8")
def train(data_dir, config_path, lr=0.01):
    # data_dir and config_path are plain string paths on the pod
    import pandas as pd
    df = pd.read_csv(f"{data_dir}/train.csv")
    with open(config_path) as f:
        cfg = json.load(f)
    return {"rows": len(df), "config": cfg}

result = train(Data("./my_dataset/"), Data("./config.json"), lr=0.001)

Data objects can be nested inside lists, tuples, and dicts at any depth:

@keras_remote.run(accelerator="cpu")
def process(datasets):
    return [sorted(os.listdir(d)) for d in datasets]

result = process(datasets=[Data("./set_a/"), Data("./set_b/")])

Method 2: Volumes (fixed-path mounts)

Data is downloaded to a specific absolute path on the pod before function execution:

@keras_remote.run(
    accelerator="cpu",
    volumes={"/data": Data("./dataset/")},
)
def train():
    files = os.listdir("/data")  # data is available at /data
    return files

Mixed usage

Both methods can be combined:

@keras_remote.run(
    accelerator="cpu",
    volumes={"/weights": Data("./weights/")},
)
def train(config_path, lr=0.001):
    with open(config_path) as f:
        cfg = json.load(f)
    has_weights = os.path.isdir("/weights")
    return {"config": cfg, "lr": lr, "has_weights": has_weights}

result = train(Data("./config.json"), lr=0.01)

End-to-end data flow

  1. User wraps paths with Data(...) and passes them as function args or in volumes
  2. Client-side (_prepare_artifacts in execution.py):
    • Computes content hash for each local Data object
    • Uploads to GCS at {namespace}/data-cache/{hash}/ with content-addressed caching
    • Replaces Data objects with serializable __data_ref__ dicts in args/kwargs
    • Excludes Data paths from the context zip to avoid double-uploading
  3. Payload is serialized with volumes list and data-ref-replaced args/kwargs
  4. Pod-side (remote_runner.py):
    • resolve_volumes() downloads volume data to their fixed mount paths
    • resolve_data_refs() recursively resolves __data_ref__ dicts in args/kwargs to local paths
    • For single files, returns the file path directly (not a containing directory)
  5. User function receives plain str paths — completely unaware of the Data abstraction

Execution pipeline changes

JobContext dataclass — new field:

volumes: Optional[dict] = None  # {mount_path: Data}

_prepare_artifacts() now:

  1. Collects exclude paths for Data objects inside the caller's working directory
  2. Uploads each volume's Data via upload_data() and creates data ref dicts
  3. Scans args/kwargs recursively for Data objects via extract_data_refs()
  4. Uploads each arg-based Data and creates ref dicts
  5. Replaces Data objects in args/kwargs with ref dicts via replace_data_with_refs()
  6. Passes exclude_paths to zip_working_dir() and volumes to save_payload()

Packager changes

New functions:

  • extract_data_refs(args, kwargs) — recursively scans for Data objects at any nesting depth, returning (data_obj, position_path) tuples
  • replace_data_with_refs(args, kwargs, ref_map) — recursively replaces Data objects with their serializable ref dicts

Updated functions:

  • zip_working_dir() — now accepts exclude_paths set to skip Data-referenced paths
  • save_payload() — now accepts optional volumes list, included in the serialized payload

Remote runner changes

New functions on the pod side:

  • resolve_volumes(volume_refs, storage_client) — downloads each volume's data from GCS to its declared mount path
  • resolve_data_refs(args, kwargs, storage_client) — recursively walks args/kwargs, downloads data from GCS to DATA_DIR/{counter}/, replaces ref dicts with local paths
  • _download_data(ref, target_dir, storage_client) — downloads blobs from a GCS URI prefix, skipping .cache_marker sentinels and directory markers

@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly enhances the remote execution capabilities by integrating a new Data API. It allows users to seamlessly manage data for remote jobs, whether by passing data directly as function arguments or by mounting them as persistent volumes. The changes automate the lifecycle of data, from local packaging and GCS upload to remote download and resolution, while also optimizing the transfer size by intelligently excluding data from the zipped working directory.

Highlights

  • Data Object Integration: Introduced comprehensive support for Data objects throughout the remote execution pipeline, enabling them to be passed as function arguments or mounted as volumes.
  • Volume Mounting: Implemented functionality to define Data objects as volumes with specified absolute mount paths, ensuring data is downloaded to these locations on the remote pod before execution.
  • Argument Handling: Enabled Data objects to be used directly within function arguments and keyword arguments, including nested structures like lists and dictionaries, with automatic resolution to local paths on the remote runner.
  • Optimized Packaging: Modified the working directory zipping process to exclude local paths referenced by Data objects, preventing unnecessary bloating of the context package sent to the remote environment.
  • Remote Data Resolution: The remote runner now includes logic to download and resolve Data references from Google Cloud Storage (GCS) to local temporary directories or specified mount paths before executing the user's function.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Changelog
  • keras_remote/backend/execution.py
    • Imported _make_data_ref for creating data references.
    • Added an optional volumes dictionary to JobContext to store data volumes.
    • Updated JobContext.from_params to accept volumes as an argument.
    • Introduced _maybe_exclude function to identify and exclude data paths from the working directory zip.
    • Refactored _prepare_artifacts to process Data objects from arguments and volumes, upload them to GCS, create references, and exclude their local paths from the zipped context.
    • Modified save_payload to accept and include volumes in the serialized payload.
    • Updated zip_working_dir to accept an exclude_paths argument.
  • keras_remote/core/core.py
    • Imported the Data class.
    • Added an optional volumes parameter to the run decorator.
    • Implemented validation for the volumes argument, ensuring it's a dictionary with absolute mount paths and Data instances.
    • Passed the volumes argument through to the backend execution functions (_execute_on_gke, _execute_on_pathways) and JobContext.from_params.
  • keras_remote/core/core_test.py
    • Adjusted the index for retrieving env_vars in mock_exec.call_args in several tests to account for the new volumes argument.
  • keras_remote/runner/remote_runner.py
    • Defined DATA_DIR as a temporary directory for local data downloads.
    • Modified run_gcs_mode to include steps for resolving Data references and volumes before function execution.
    • Added resolve_volumes function to download data associated with specified mount paths.
    • Added resolve_data_refs function to recursively replace Data reference dictionaries in arguments and keyword arguments with local file paths.
    • Implemented _download_data to handle downloading data from GCS to local directories, including handling directory structures and skipping cache markers.
  • keras_remote/runner/remote_runner_test.py
    • Imported new functions: _download_data, resolve_data_refs, resolve_volumes.
    • Added TestDownloadData class with tests for downloading files, skipping markers, and creating subdirectories.
    • Added TestResolveDataRefs class with tests for replacing references with paths, handling nested references, single files, and preserving non-reference dictionaries.
    • Added TestResolveVolumes class with tests for downloading to mount paths and handling multiple volumes.
    • Updated _setup_gcs_test and _run_gcs_mode to support volumes in test setups.
    • Added new tests in TestRunGcsMode to verify Data reference and volume resolution before execution, and to confirm original behavior for calls without Data objects.
  • keras_remote/utils/packager.py
    • Imported the Data class.
    • Enhanced zip_working_dir to accept and utilize an exclude_paths argument, preventing specified files or directories from being included in the zip.
    • Modified save_payload to optionally include volumes in the serialized payload.
    • Added extract_data_refs function to recursively find Data objects within function arguments and keyword arguments.
    • Introduced _scan_for_data as a helper for recursive scanning of data objects.
    • Added replace_data_with_refs function to substitute Data objects with serializable reference dictionaries.
    • Implemented _replace_in_value as a helper for recursive replacement of data objects.
  • keras_remote/utils/packager_test.py
    • Imported Data, extract_data_refs, and replace_data_with_refs.
    • Updated _zip_and_list to accept exclude_paths for testing zip exclusions.
    • Added tests in TestZipWorkingDir for excluding directories, single files, and multiple paths.
    • Modified _save_and_load to accept volumes for payload testing.
    • Added tests in TestSavePayload to verify that volumes are correctly included or excluded from the payload.
    • Added TestExtractDataRefs class with tests for extracting data references from various argument structures.
    • Added TestReplaceDataWithRefs class with tests for replacing data objects with references in arguments and keyword arguments.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request integrates a new Data API to handle data dependencies for remote function execution, allowing Data objects to be passed as function arguments and used as volumes. While the implementation is thorough, a high-severity path traversal vulnerability was identified in the _download_data function in remote_runner.py, which could allow an attacker to overwrite arbitrary files on the remote execution pod. Additionally, there are correctness issues in the data download logic on the remote runner that could cause problems with certain GCS paths, and a minor refactoring is suggested for improved readability.

Copy link
Collaborator

@divyashreepathihalli divyashreepathihalli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the PR - just reviewed remote_runner and packager for now

# Volume-mounted data refs are handled by Kubernetes, skip download
if obj.get("mount_path") is not None:
return obj
local_dir = os.path.join(DATA_DIR, str(counter))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Every time _resolve encounters a Data reference, it increments the counter and blindly initiates a fresh download. If a user passes the exact same dataset multiple times (e.g., train(dataset=Data("./cache"), eval=Data("./cache"))), the remote runner will download the entire potentially gigabyte-sized bucket multiple times to /tmp/data/0 and /tmp/data/1. We should wrap this in a download_cache = {} dictionary keyed by obj["gcs_uri"] at the top of resolve_data_refs

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for catching this, but since this doesn't affect correctness we can tackle this separately in another PR for sake of simplicity. Created #69 for tracking.


blobs = bucket.list_blobs(prefix=prefix + "/")
count = 0
for blob in blobs:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This loop executes synchronously. Downloading a framework dataset consisting of 100,000 tiny .png or audio files will take long and may even exceed the pod start timeout due to individual GCS HTTP request latency. maybe wrap the download_to_filename command inside a concurrent.futures.ThreadPoolExecutor to execute a batch of downloads concurrently?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, but let's handle this in a separate change since it doesn't affect correctness. It will keep this PR's size manageable. Created #70 for tracking.

@divyashreepathihalli
Copy link
Collaborator

not sure why the rest of the checks are not running

Base automatically changed from data-api-1 to main March 6, 2026 05:09
Wires Data objects through packager, core API, execution backend, and
remote runner. Supports Data as function args (including nested in
lists/dicts), volumes with fixed mount paths, and excludes data paths
from working directory zips.
@JyotinderSingh JyotinderSingh merged commit a4569fe into main Mar 6, 2026
4 checks passed
@JyotinderSingh JyotinderSingh deleted the data-api-2 branch March 6, 2026 05:13
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants