Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support @parallel on Kubernetes with jobsets #1744

Closed
wants to merge 1 commit into from

Conversation

shrinandj
Copy link
Contributor

This commit adds support for @parallel when flows are run --with kubernetes Support for Argo workflows will follow in a separate commit.

A user can run a flow with the following:

@step
def start(self):
    self.next(self.parallel_step, num_parallel=3)

@kubernetes(cpu=1, memory=512)
@parallel
@step
def parallel_step(self):
...

Testing Done:

  • Ran a flow with @parallel on Kubernetes. Verified that it works correctly
  • Ran a flow without @parallel on Kubernetes. Verified that it works as expected.
  • Verified that jobsets based @parallel step gets scaled down if user kills it with a Ctrl-C

@shrinandj shrinandj closed this Feb 19, 2024
@shrinandj shrinandj force-pushed the shri/support-jobsets branch from f391613 to 986e10e Compare February 19, 2024 18:07
@shrinandj shrinandj reopened this Feb 19, 2024
@romain-intel
Copy link
Contributor

Mergeable anytime from my end -- no impact on core.

This commit adds support for @parallel when flows are run `--with kubernetes`
Support for Argo workflows will follow in a separate commit.

A user can run a flow with the following:

    @step
    def start(self):
        self.next(self.parallel_step, num_parallel=3)

    @kubernetes(cpu=1, memory=512)
    @parallel
    @step
    def parallel_step(self):
    ...

Testing Done:
- Ran a flow with @parallel on Kubernetes. Verified that it works correctly
- Ran a flow without @parallel on Kubernetes. Verified that it works as expected.
- Verified that jobsets based @parallel step gets scaled down if user kills it with a Ctrl-C
@shrinandj shrinandj force-pushed the shri/support-jobsets branch from 76b6310 to 00720cc Compare March 4, 2024 16:38
@shrinandj
Copy link
Contributor Author

There will be more testing done on this PR once #1745 gets merged. Distributed training workloads use an open port communicating with each other.

valayDave pushed a commit to valayDave/metaflow that referenced this pull request Mar 27, 2024
Linked to upstream : [Netflix#1744]

This commit adds support for @parallel when flows are run `--with kubernetes`
Support for Argo workflows will follow in a separate commit.

A user can run a flow with the following:

    @step
    def start(self):
        self.next(self.parallel_step, num_parallel=3)

    @kubernetes(cpu=1, memory=512)
    @parallel
    @step
    def parallel_step(self):
    ...

Testing Done:
- Ran a flow with @parallel on Kubernetes. Verified that it works correctly
- Ran a flow without @parallel on Kubernetes. Verified that it works as expected.
- Verified that jobsets based @parallel step gets scaled down if user kills it with a Ctrl-C
valayDave pushed a commit to valayDave/metaflow that referenced this pull request Apr 18, 2024
Implementation originates from [Netflix#1744]

This commit adds support for @parallel when flows are run `--with kubernetes`
Support for Argo workflows will follow in a separate commit.

A user can run a flow with the following:

    @step
    def start(self):
        self.next(self.parallel_step, num_parallel=3)

    @kubernetes(cpu=1, memory=512)
    @parallel
    @step
    def parallel_step(self):
    ...

Testing Done:
- Ran a flow with @parallel on Kubernetes. Verified that it works correctly
- Ran a flow without @parallel on Kubernetes. Verified that it works as expected.
- Verified that jobsets based @parallel step gets scaled down if user kills it with a Ctrl-C

Changes to original Implementation:
- pass down ports of Jobsets
- Ensured that `ubf_context` is set correctly
- Ensured that `split-index` is set correctly based on the type of task (control vs worker)
- Fix bug in incorrect RANK setting. In the earlier implementation, we were setting `parallelism` to created  `replicatedJobs`.
    - In this implementation, we create a different copy of the job for each replicated worker.
    - So retrieving the rank based on the Kubernetes V1EnvVar.valueFrom (metadata.annotations['batch.kubernetes.io/job-completion-index']) wont work.
    - since `job-completion-index` relies on setting `parallelism` on the `job_spec`.
    Instead now we just statically set the `RANK` based on the index in the iterator defining the jobs.

Link To Upstream : [COMING SOON!]
valayDave pushed a commit to valayDave/metaflow that referenced this pull request Apr 18, 2024
Implementation originates from [Netflix#1744]

This commit adds support for @parallel when flows are run `--with kubernetes`
Support for Argo workflows will follow in a separate commit.

A user can run a flow with the following:

    @step
    def start(self):
        self.next(self.parallel_step, num_parallel=3)

    @kubernetes(cpu=1, memory=512)
    @parallel
    @step
    def parallel_step(self):
    ...

Testing Done:
- Ran a flow with @parallel on Kubernetes. Verified that it works correctly
- Ran a flow without @parallel on Kubernetes. Verified that it works as expected.
- Verified that jobsets based @parallel step gets scaled down if user kills it with a Ctrl-C

Changes to original Implementation:
- pass down ports of Jobsets
- Ensured that `ubf_context` is set correctly
- Ensured that `split-index` is set correctly based on the type of task (control vs worker)
- Fix bug in incorrect RANK setting. In the earlier implementation, we were setting `parallelism` to created  `replicatedJobs`.
    - In this implementation, we create a different copy of the job for each replicated worker.
    - So retrieving the rank based on the Kubernetes V1EnvVar.valueFrom (metadata.annotations['batch.kubernetes.io/job-completion-index']) wont work.
    - since `job-completion-index` relies on setting `parallelism` on the `job_spec`.
    Instead now we just statically set the `RANK` based on the index in the iterator defining the jobs.

Link To Upstream : [COMING SOON!]
valayDave added a commit to valayDave/metaflow that referenced this pull request Apr 18, 2024
Implementation originates from [Netflix#1744]

This commit adds support for @parallel when flows are run `--with kubernetes`
Support for Argo workflows will follow in a separate commit.

A user can run a flow with the following:

    @step
    def start(self):
        self.next(self.parallel_step, num_parallel=3)

    @kubernetes(cpu=1, memory=512)
    @parallel
    @step
    def parallel_step(self):
    ...

Testing Done:
- Ran a flow with @parallel on Kubernetes. Verified that it works correctly
- Ran a flow without @parallel on Kubernetes. Verified that it works as expected.
- Verified that jobsets based @parallel step gets scaled down if user kills it with a Ctrl-C

Changes to original Implementation:
- pass down ports of Jobsets
- Ensured that `ubf_context` is set correctly
- Ensured that `split-index` is set correctly based on the type of task (control vs worker)
- Fix bug in incorrect RANK setting. In the earlier implementation, we were setting `parallelism` to created  `replicatedJobs`.
    - In this implementation, we create a different copy of the job for each replicated worker.
    - So retrieving the rank based on the Kubernetes V1EnvVar.valueFrom (metadata.annotations['batch.kubernetes.io/job-completion-index']) wont work.
    - since `job-completion-index` relies on setting `parallelism` on the `job_spec`.
    Instead now we just statically set the `RANK` based on the index in the iterator defining the jobs.
@shrinandj shrinandj closed this Apr 18, 2024
valayDave pushed a commit to valayDave/metaflow that referenced this pull request May 8, 2024
Implementation originates from [Netflix#1744]

This commit adds support for @parallel when flows are run `--with kubernetes`
Support for Argo workflows will follow in a separate commit.

A user can run a flow with the following:

    @step
    def start(self):
        self.next(self.parallel_step, num_parallel=3)

    @kubernetes(cpu=1, memory=512)
    @parallel
    @step
    def parallel_step(self):
    ...
valayDave pushed a commit to valayDave/metaflow that referenced this pull request May 8, 2024
Implementation originates from [Netflix#1744]

This commit adds support for @parallel when flows are run `--with kubernetes`
Support for Argo workflows will follow in a separate commit.

A user can run a flow with the following:

    @step
    def start(self):
        self.next(self.parallel_step, num_parallel=3)

    @kubernetes(cpu=1, memory=512)
    @parallel
    @step
    def parallel_step(self):
    ...
valayDave pushed a commit to valayDave/metaflow that referenced this pull request May 10, 2024
Implementation originates from [Netflix#1744]

This commit adds support for @parallel when flows are run `--with kubernetes`
Support for Argo workflows will follow in a separate commit.

A user can run a flow with the following:

    @step
    def start(self):
        self.next(self.parallel_step, num_parallel=3)

    @kubernetes(cpu=1, memory=512)
    @parallel
    @step
    def parallel_step(self):
    ...
valayDave added a commit to valayDave/metaflow that referenced this pull request May 13, 2024
Implementation originates from [Netflix#1744]

This commit adds support for @parallel when flows are run `--with kubernetes`
Support for Argo workflows will follow in a separate commit.

A user can run a flow with the following:

    @step
    def start(self):
        self.next(self.parallel_step, num_parallel=3)

    @kubernetes(cpu=1, memory=512)
    @parallel
    @step
    def parallel_step(self):
    ...

Some notes about the implementation:

- No annotations for task-id in pods since We cannot dynamically construct the task-id during K8s container runtime.
- @catch is currently not supported with @parallel on kubernetes
- metadata about jobset name exists in the task-metadata
- The jobset will contain two job definitions; One for control and one for worker.
- The worker will have n-1 replicas created.
- We construct the worker task-id determininstically using naming conventions and shell hacking.
- Jobset is considered running even if one job amongst all of them are running.
- @Retry will work with jobset
valayDave added a commit to valayDave/metaflow that referenced this pull request May 20, 2024
Implementation originates from [Netflix#1744]

This commit adds support for @parallel when flows are run `--with kubernetes`
Support for Argo workflows will follow in a separate commit.

A user can run a flow with the following:

    @step
    def start(self):
        self.next(self.parallel_step, num_parallel=3)

    @kubernetes(cpu=1, memory=512)
    @parallel
    @step
    def parallel_step(self):
    ...

Some notes about the implementation:

- No annotations for task-id in pods since We cannot dynamically construct the task-id during K8s container runtime.
- @catch is currently not supported with @parallel on kubernetes
- metadata about jobset name exists in the task-metadata
- The jobset will contain two job definitions; One for control and one for worker.
- The worker will have n-1 replicas created.
- We construct the worker task-id determininstically using naming conventions and shell hacking.
- Jobset is considered running even if one job amongst all of them are running.
- @Retry will work with jobset
- num_parallel <=1 will NOT be supported to start with;
    - One core reason is that jobsets don't allow setting replicas to 0;
    - jobsets controller will mutate a jobset with replica set to 0 with replicas set to 1.
valayDave added a commit to valayDave/metaflow that referenced this pull request May 20, 2024
Implementation originates from [Netflix#1744]

This commit adds support for @parallel when flows are run `--with kubernetes`
Support for Argo workflows will follow in a separate commit.

A user can run a flow with the following:

    @step
    def start(self):
        self.next(self.parallel_step, num_parallel=3)

    @kubernetes(cpu=1, memory=512)
    @parallel
    @step
    def parallel_step(self):
    ...

Some notes about the implementation:

- No annotations for task-id in pods since We cannot dynamically construct the task-id during K8s container runtime.
- @catch is currently not supported with @parallel on kubernetes
- metadata about jobset name exists in the task-metadata
- The jobset will contain two job definitions; One for control and one for worker.
- The worker will have n-1 replicas created.
- We construct the worker task-id determininstically using naming conventions and shell hacking.
- Jobset is considered running even if one job amongst all of them are running.
- @Retry will work with jobset
- num_parallel <=1 will NOT be supported to start with;
    - One core reason is that jobsets don't allow setting replicas to 0;
    - jobsets controller will mutate a jobset with replica set to 0 with replicas set to 1.
- The implementation accounts for Jobset CRD schema from v0.2.0
    - Jobset team changed the schema (just renaming values) after v0.3.0
    - The changes were to `replicatedJobsStatus` where certain fields were added and `ReplicatedJobsStatus` was renamed to `replicatedJobsStatus`
savingoyal pushed a commit that referenced this pull request May 20, 2024
Implementation originates from [#1744]

This commit adds support for @parallel when flows are run `--with kubernetes`
Support for Argo workflows will follow in a separate commit.

A user can run a flow with the following:

    @step
    def start(self):
        self.next(self.parallel_step, num_parallel=3)

    @kubernetes(cpu=1, memory=512)
    @parallel
    @step
    def parallel_step(self):
    ...

Some notes about the implementation:

- No annotations for task-id in pods since We cannot dynamically construct the task-id during K8s container runtime.
- @catch is currently not supported with @parallel on kubernetes
- metadata about jobset name exists in the task-metadata
- The jobset will contain two job definitions; One for control and one for worker.
- The worker will have n-1 replicas created.
- We construct the worker task-id determininstically using naming conventions and shell hacking.
- Jobset is considered running even if one job amongst all of them are running.
- @Retry will work with jobset
- num_parallel <=1 will NOT be supported to start with;
    - One core reason is that jobsets don't allow setting replicas to 0;
    - jobsets controller will mutate a jobset with replica set to 0 with replicas set to 1.
- The implementation accounts for Jobset CRD schema from v0.2.0
    - Jobset team changed the schema (just renaming values) after v0.3.0
    - The changes were to `replicatedJobsStatus` where certain fields were added and `ReplicatedJobsStatus` was renamed to `replicatedJobsStatus`
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants