Skip to content

Add workflow concurrency control#6475

Merged
Sovietaced merged 13 commits into
flyteorg:masterfrom
thomasjhuang:thhuang/internal-concurrency-commit
Aug 1, 2025
Merged

Add workflow concurrency control#6475
Sovietaced merged 13 commits into
flyteorg:masterfrom
thomasjhuang:thhuang/internal-concurrency-commit

Conversation

@thomasjhuang

@thomasjhuang thomasjhuang commented May 29, 2025

Copy link
Copy Markdown
Contributor

Tracking issue

This solves #5659 and also closes out #6309. The related flytekit change is here - flyteorg/flytekit#3267. Docs are added in this PR - unionai/unionai-docs#417

Why are the changes needed?

This work is to provide workflow concurrency control at a project level. It is a barebones implementation with the guarantee that at most x many workflows will be running, but it cannot guarantee that x many workflows are always running. We can control concurrency across all versions, and users specify the controls via LaunchPlan instantiation:

concurrency_limited_lp = LaunchPlan.get_or_create(
    name="my_concurrent_lp",
    workflow=my_workflow,
    concurrency=ConcurrencyPolicy(
        max_concurrency=3,
        behavior=ConcurrencyLimitBehavior.SKIP,
    ),
)

What changes were proposed in this pull request?

The primary mechanism is fairly straightforward - whenever we attempt to launch an execution, make a db query to check for running executions given the NamedEntityIdentifier triplet (project/domain/wf_name), and if the running executions is above the threshold for concurrency (max_concurrency) then we immediately fail to create the execution.

How was this patch tested?

Unit tests added to execution_manager_test.go, but namely this was internally tested at LinkedIn since we are porting this feature externally. More testing is in progress on local sandbox.

Labels

  • added: ConcurrencyPolicy and logic surrounding workflow concurrency management, as well as db migration to add an index on executions table for execution_phase.

Setup process

Screenshots

I was able to confirm locally that we receive the correct error message when we are running multiple workflows concurrently at limits of 1 and 2. Within the same project, running a workflow without limits works.

Error: rpc error: code = ResourceExhausted desc = Concurrency limit (1) reached for launch plan concurrency_
limit_1. Skipping execution.
{"json":{},"level":"error","msg":"rpc error: code = ResourceExhausted desc = Concurrency limit (1) reached f
or launch plan concurrency_limit_1. Skipping execution.","ts":"2025-07-07T23:12:38-07:00"}

Check all the applicable boxes

  • I updated the documentation accordingly.
  • All new and existing tests passed.
  • All commits are signed-off.

Related PRs

#5659 and #6309, will also add in flytekit PR as reference.

Docs link

  • Will add in docs

Summary by Bito

This pull request introduces a feature for managing workflow concurrency at the project level, allowing users to set limits on concurrent executions via a ConcurrencyPolicy. It includes logic to check running executions, enhances logging, and updates the database schema to support these changes, addressing issues #5659 and #6309.

@codecov

codecov Bot commented May 29, 2025

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 63.63636% with 28 lines in your changes missing coverage. Please review.
✅ Project coverage is 58.70%. Comparing base (c085cb5) to head (8208637).
⚠️ Report is 12 commits behind head on master.

Files with missing lines Patch % Lines
flyteadmin/pkg/manager/impl/execution_manager.go 71.01% 15 Missing and 5 partials ⚠️
flyteadmin/pkg/repositories/config/migrations.go 0.00% 8 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master    #6475      +/-   ##
==========================================
+ Coverage   58.67%   58.70%   +0.02%     
==========================================
  Files         938      938              
  Lines       71466    71690     +224     
==========================================
+ Hits        41933    42085     +152     
- Misses      26346    26413      +67     
- Partials     3187     3192       +5     
Flag Coverage Δ
unittests-datacatalog 59.03% <ø> (ø)
unittests-flyteadmin 56.14% <63.63%> (-0.09%) ⬇️
unittests-flytecopilot 39.56% <ø> (ø)
unittests-flytectl 64.72% <ø> (ø)
unittests-flyteidl 76.12% <ø> (ø)
unittests-flyteplugins 61.14% <ø> (+<0.01%) ⬆️
unittests-flytepropeller 55.06% <ø> (+0.22%) ⬆️
unittests-flytestdlib 64.02% <ø> (ø)

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@Sovietaced

Sovietaced commented May 30, 2025

Copy link
Copy Markdown
Member

Can you run make generate in flyteidl dir and make lint-fix in flyteadmin dir? There are some build failures

@davidmirror-ops

Copy link
Copy Markdown
Contributor

@thomasjhuang thanks for your contribution. Also please check out the failing DCO check.
Do you plan to set a specific phase when an execution is skipped? That could be a bit beyond the scope of this PR though but it'd be useful.
Also please add some docs so users know how to leverage this in launchplans.

@kumare3

kumare3 commented Jun 13, 2025

Copy link
Copy Markdown
Contributor

Taking a look

@kumare3

kumare3 commented Jun 13, 2025

Copy link
Copy Markdown
Contributor

@troychiu can you please take a first look, you looked into this recently right?

@troychiu

Copy link
Copy Markdown
Member

I don't really have context but would love to take a look.

@kumare3

kumare3 commented Jun 17, 2025

Copy link
Copy Markdown
Contributor

@troychiu sorry wrong one. I meant the volcano PR. I can take a look at this one

@thomasjhuang thomasjhuang force-pushed the thhuang/internal-concurrency-commit branch from da55f4d to 532dfc4 Compare June 20, 2025 22:46
@thomasjhuang thomasjhuang force-pushed the thhuang/internal-concurrency-commit branch from 532dfc4 to 1e6ffd4 Compare July 8, 2025 05:08
@thomasjhuang thomasjhuang requested a review from ppiegaze as a code owner July 8, 2025 05:08
@thomasjhuang

Copy link
Copy Markdown
Contributor Author

Okay I've added a few things - docs, unit test, and ran make generate. make lint-fix didn't indicate any issues on code changes that I've introduced here. Is there a reason why the checks are all pending? Probably ready for review now. Please also check out the related flytekit change - flyteorg/flytekit#3267.

@davidmirror-ops @kumare3 @Sovietaced

@thomasjhuang thomasjhuang force-pushed the thhuang/internal-concurrency-commit branch from 4e02f0a to 235c354 Compare July 8, 2025 17:44
@davidmirror-ops

Copy link
Copy Markdown
Contributor

@thomasjhuang thanks for this amazing contribution. Sorry about this but could you port the docs update to the new repo? https://github.com/unionai/docs I'll take point on updating this repo's docs folder to indicate it's no longer the content that ends up rendered in the docs site

@EngHabu EngHabu left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great! (and simple 🥳 🥳 🥳 )... left a couple of comments

Comment thread flyteadmin/pkg/manager/impl/execution_manager.go Outdated
WHERE
executions.project = '${lpProject}'
AND executions.domain = '${lpDomain}'
AND launch_plans.name = '${lpName}'

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it also filter on launch_plans.project/domain? If you confirmed it uses the right index, then nvm

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding launch_plans.project/domain probably isn't needed bc the join already ensures we only see launch plans from the same project/domain as the filtered executions. I had done a MySQL explain and it does use the newly created index correctly.

Comment thread flyteidl/protos/flyteidl/admin/launch_plan.proto Outdated
Comment thread flyteadmin/pkg/manager/impl/execution_manager.go Outdated
Comment thread flyteadmin/pkg/manager/impl/execution_manager.go Outdated
Comment thread flyteadmin/pkg/manager/impl/execution_manager.go Outdated
Comment thread flyteadmin/pkg/manager/impl/execution_manager.go
Comment thread flyteadmin/pkg/manager/impl/execution_manager.go Outdated
Comment thread flyteadmin/pkg/manager/impl/execution_manager.go Outdated
Comment thread flyteadmin/pkg/manager/impl/execution_manager.go Outdated
Comment thread flyteidl/protos/flyteidl/admin/launch_plan.proto Outdated
Comment thread flyteidl/protos/flyteidl/admin/launch_plan.proto Outdated
@Sovietaced Sovietaced self-requested a review July 15, 2025 21:13

@Sovietaced Sovietaced left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm looking at porting a version of this to our fork and one thing I'm looking at is how this affects Flyte scheduler. Flyte scheduler is hardcoded to try and retry the execution creation up to 30 times if it fails, unless the gRPC status code in the error is codes.AlreadyExists.

I think the scheduler code will need to be updated to either give up on ResourceExhausted or we'll need to a way to articulate this case through richer error details.

@thomasjhuang

thomasjhuang commented Jul 22, 2025

Copy link
Copy Markdown
Contributor Author

@thomasjhuang thanks for this amazing contribution. Sorry about this but could you port the docs update to the new repo? https://github.com/unionai/docs I'll take point on updating this repo's docs folder to indicate it's no longer the content that ends up rendered in the docs site

Got it, I've opened a PR here - unionai/unionai-docs#417

@thomasjhuang

Copy link
Copy Markdown
Contributor Author

I'm looking at porting a version of this to our fork and one thing I'm looking at is how this affects Flyte scheduler. Flyte scheduler is hardcoded to try and retry the execution creation up to 30 times if it fails, unless the gRPC status code in the error is codes.AlreadyExists.

I think the scheduler code will need to be updated to either give up on ResourceExhausted or we'll need to a way to articulate this case through richer error details.

Makes sense - this change will increase ResourceExhausted errors noticeably, especially if measured as a metric and it can be misleading since it isn't really an error. Some changes on flytescheduler are probably needed. Although not implemented yet, I intend to also add the execution phase SKIPPED so that the UI can indicate skipped executions rather than throw resource error.

@flyte-bot

Copy link
Copy Markdown
Collaborator

Bito Automatic Review Skipped - Files Excluded

Bito didn't auto-review this change because all changed files are in the exclusion list for automatic reviews. No action is needed if you didn't intend for the agent to review it. Otherwise, to manually trigger a review, type /review in a comment and save.
You can change the excluded files settings here, or contact your Bito workspace admin at eduardo@union.ai.

@Sovietaced

Copy link
Copy Markdown
Member

We have an end to end test currently validating this functionality and it seems like there might be a correctness error in the logic that looks for previous executions.

func TestFlyte_WorkflowConcurrencyLimits(t *testing.T) {

	lp := "e2e_singleton_workflow"
	client, err := config.Flyte.GetAdminClient(config.TestCtx)
	require.NoError(t, err, "getting flyte admin client")

	latestLaunchPlan := config.Flyte.FindLatestLaunchPlan(config.TestCtx, t, lp)

	t.Logf("Found most recent launch plan with version [%s]", latestLaunchPlan.GetId().GetVersion())

	_, err = client.AdminClient().CreateExecution(config.TestCtx, &pbadmin.ExecutionCreateRequest{
		Project: config.Flyte.Project,
		Domain:  config.Flyte.Domain,
		Spec: &pbadmin.ExecutionSpec{
			LaunchPlan: &pbcore.Identifier{
				ResourceType: pbcore.ResourceType_LAUNCH_PLAN,
				Project:      config.Flyte.Project,
				Domain:       config.Flyte.Domain,
				Name:         lp,
				Version:      latestLaunchPlan.GetId().GetVersion(),
			},
		},
	})
	require.NoError(t, err, "creating execution")

	// Creating a second execution should fail while the first is non-terminal
	_, err = client.AdminClient().CreateExecution(config.TestCtx, &pbadmin.ExecutionCreateRequest{
		Project: config.Flyte.Project,
		Domain:  config.Flyte.Domain,
		Spec: &pbadmin.ExecutionSpec{
			LaunchPlan: &pbcore.Identifier{
				ResourceType: pbcore.ResourceType_LAUNCH_PLAN,
				Project:      config.Flyte.Project,
				Domain:       config.Flyte.Domain,
				Name:         lp,
				Version:      latestLaunchPlan.GetId().GetVersion(),
			},
		},
	})
	require.Error(t, err, "creating execution")
	s, ok := status.FromError(err)
	require.True(t, ok, "should be a grpc status error")
	require.Equal(t, codes.ResourceExhausted, s.Code())
}

This is failing in our production environment where there is more load and I'm wondering if the state filtering isn't quite right.

Comment thread flyteadmin/pkg/manager/impl/execution_manager.go
Comment thread flyteadmin/pkg/manager/impl/execution_manager_test.go Outdated
@Sovietaced

Copy link
Copy Markdown
Member

Makes sense - this change will increase ResourceExhausted errors noticeably, especially if measured as a metric and it can be misleading since it isn't really an error. Some changes on flytescheduler are probably needed. Although not implemented yet, I intend to also add the execution phase SKIPPED so that the UI can indicate skipped executions rather than throw resource error.

For this version can we at least treat it as non-retryable? That's what we're doing and it seems to be ok.

@thomasjhuang thomasjhuang force-pushed the thhuang/internal-concurrency-commit branch from 8a83072 to 0f57bdd Compare August 1, 2025 20:07
Signed-off-by: thomasjhuang <thomashuang63@gmail.com>
Signed-off-by: thomasjhuang <thomashuang63@gmail.com>
Signed-off-by: thomasjhuang <thomashuang63@gmail.com>
Signed-off-by: thomasjhuang <thomashuang63@gmail.com>
Signed-off-by: thomasjhuang <thomashuang63@gmail.com>
Signed-off-by: thomasjhuang <thomashuang63@gmail.com>
Co-authored-by: Haytham Abuelfutuh <haytham@afutuh.com>

Signed-off-by: thomasjhuang <thomashuang63@gmail.com>
Co-authored-by: Haytham Abuelfutuh <haytham@afutuh.com>

Signed-off-by: thomasjhuang <thomashuang63@gmail.com>
Signed-off-by: thomasjhuang <thomashuang63@gmail.com>
Signed-off-by: thomasjhuang <thomashuang63@gmail.com>
Signed-off-by: thomasjhuang <thomashuang63@gmail.com>
Signed-off-by: thomasjhuang <thomashuang63@gmail.com>
Signed-off-by: thomasjhuang <thomashuang63@gmail.com>
@Sovietaced Sovietaced added added Merged changes that add new functionality and removed triage/discuss labels Aug 1, 2025
@thomasjhuang thomasjhuang force-pushed the thhuang/internal-concurrency-commit branch from 0f57bdd to 8208637 Compare August 1, 2025 20:34
@Sovietaced

Copy link
Copy Markdown
Member

@EngHabu seemed happy with this. We've been using a variation of it in production for the past couple weeks so I think its safe to land this.

@Sovietaced Sovietaced merged commit 640ad57 into flyteorg:master Aug 1, 2025
49 checks passed
@welcome

welcome Bot commented Aug 1, 2025

Copy link
Copy Markdown

Congrats on merging your first pull request! 🎉

@popojk

popojk commented Aug 5, 2025

Copy link
Copy Markdown
Contributor

I'm looking at porting a version of this to our fork and one thing I'm looking at is how this affects Flyte scheduler. Flyte scheduler is hardcoded to try and retry the execution creation up to 30 times if it fails, unless the gRPC status code in the error is codes.AlreadyExists.
I think the scheduler code will need to be updated to either give up on ResourceExhausted or we'll need to a way to articulate this case through richer error details.

Makes sense - this change will increase ResourceExhausted errors noticeably, especially if measured as a metric and it can be misleading since it isn't really an error. Some changes on flytescheduler are probably needed. Although not implemented yet, I intend to also add the execution phase SKIPPED so that the UI can indicate skipped executions rather than throw resource error.

Hi @thomasjhuang @Sovietaced . Since I’m currently working on some scheduler related issues, I can go ahead and open an issue for this and submit a PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

added Merged changes that add new functionality

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants