Skip to content

[dagster-graphql] Bulk terminatePipelineRun mutation #3441

Open
@mgasner

Description

@mgasner

Would be great to be able to terminate a list of pipeline runs in bulk, or all pipeline runs matching a filter.

cf.

from gql.transport.aiohttp import AIOHTTPTransport

transport = AIOHTTPTransport(url="http://localhost:8080/graphql")

client = Client(transport=transport, fetch_schema_from_transport=True)

runs_query = gql(
    """
    query enqueuedRuns ($limit: Int, $filter: PipelineRunsFilter) {
        pipelineRunsOrError (limit: $limit, filter: $filter) {
            ...on PipelineRuns {
                results {
                    runId
                }
                count
            }
        }
    }
    """
)

terminate_mutation = gql(
    """
    mutation terminateRun ($runId: String!) {
        terminatePipelineExecution (runId: $runId) {
            __typename
            ...on TerminatePipelineExecutionSuccess {
                run {
                    runId
                }
            }
            ...on TerminatePipelineExecutionFailure {
                run {
                    runId
                }
                message
            }
            ...on PipelineRunNotFoundError {
                runId
                message
            }
            ...on PythonError {
                message
                stack
            }
        }
    }
    """
)
enqueued_filter = {"statuses": ["QUEUED"], "pipelineName": "slack_digest_pipeline"}

import time

LIMIT = 25
start = time.time()
result = await client.execute_async(
    runs_query, variable_values={"limit": LIMIT, "filter": enqueued_filter}
)
count = result["pipelineRunsOrError"]["count"]
print(f"{count} runs remaining")

while count > 0:
    for run in result["pipelineRunsOrError"]["results"]:
        run_id = run["runId"]
        mutation = await client.execute_async(terminate_mutation, variable_values={"runId": run_id})

        if (
            mutation["terminatePipelineExecution"]["__typename"]
            == "TerminatePipelineExecutionSuccess"
        ):
            if mutation["terminatePipelineExecution"]["run"]["runId"] != run_id:
                print(
                    f"run_id didn't match on success: expected {run_id} "
                    f"but got {mutation['terminatePipelineExecution']['run']['runId']}"
                )
        elif (
            mutation["terminatePipelineExecution"]["__typename"]
            == "TerminatePipelineExecutionFailure"
        ):
            print(
                f"Failed to delete run {run_id}: {mutation['terminatePipelineExecution']['message']}"
            )
        elif mutation["terminatePipelineExecution"]["__typename"] == "PipelineRunNotFoundError":
            print(
                f"Run not found for {run_id}: {mutation['terminatePipelineExecution']['message']}"
            )
        elif mutation["terminatePipelineExecution"]["__typename"] == "PythonError":
            formatted_stack = "\n".join(mutation["terminatePipelineExecution"]["stack"])
            print(
                f"Python error while deleting run {run_id}: {mutation['terminatePipelineExecution']['message']}\n{formatted_stack}"
            )

    result = await client.execute_async(
        runs_query, variable_values={"limit": LIMIT, "filter": enqueued_filter}
    )
    count = result["pipelineRunsOrError"]["count"]
    print(f"{count} runs remaining: {int(time.time() - start)} elapsed")

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions