Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import os
from typing import Callable, List, Optional, Union

import packaging.version

from dagster_buildkite.defines import (
GCP_CREDS_FILENAME,
GCP_CREDS_LOCAL_FILE,
Expand Down Expand Up @@ -99,13 +97,12 @@ def backcompat_extra_cmds(_, factor: str) -> List[str]:
]


def _infer_user_code_definitions_files(release: str) -> str:
"""Returns `repo.py` if on source or version >=1.0, `legacy_repo.py` otherwise."""
if release == "current_branch":
return "repo.py"
def _infer_user_code_definitions_files(user_code_release: str) -> str:
"""Returns the definitions file to use for the user code release."""
if user_code_release == EARLIEST_TESTED_RELEASE:
return "defs_for_earliest_tested_release.py"
else:
version = packaging.version.parse(release)
return "legacy_repo.py" if version < packaging.version.Version("1.0") else "repo.py"
return "defs_for_latest_release.py"


def _get_library_version(version: str) -> str:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@
import traceback
from contextlib import contextmanager
from pathlib import Path
from typing import Iterator, Mapping, Optional, Sequence
from typing import Any, Iterator, Mapping, Optional, Sequence

import dagster._check as check
import docker
import packaging.version
import pytest
import requests
from dagster._core.definitions.asset_key import AssetKey
from dagster._core.storage.dagster_run import DagsterRunStatus
from dagster._utils import (
file_relative_path,
Expand Down Expand Up @@ -46,21 +46,12 @@ def get_library_version(version: str) -> str:
return library_version_from_core_version(version)


def is_0_release(release: str) -> bool:
"""Returns true if on < 1.0 release of dagster, false otherwise."""
if release == "current_branch":
return False
version = packaging.version.parse(release)
return version < packaging.version.Version("1.0")


def infer_user_code_definitions_files(release: str) -> str:
"""Returns `repo.py` if on source or version >=1.0, `legacy_repo.py` otherwise."""
if release == "current_branch":
return "repo.py"
def infer_user_code_definitions_files(user_code_release: str) -> str:
"""Returns the definitions file to use for the user code release."""
if user_code_release == EARLIEST_TESTED_RELEASE:
return "defs_for_earliest_tested_release.py"
else:
version = packaging.version.parse(release)
return "legacy_repo.py" if version < packaging.version.Version("1.0") else "repo.py"
return "defs_for_latest_release.py"


def assert_run_success(client: DagsterGraphQLClient, run_id: str) -> None:
Expand Down Expand Up @@ -263,15 +254,15 @@ def test_backcompat_deployed_pipeline(
graphql_client: DagsterGraphQLClient, release_test_map: Mapping[str, str]
):
# Only run this test on legacy versions
if is_0_release(release_test_map["user_code"]):
if release_test_map["user_code"] == EARLIEST_TESTED_RELEASE:
assert_runs_and_exists(graphql_client, "the_pipeline")


def test_backcompat_deployed_pipeline_subset(
graphql_client: DagsterGraphQLClient, release_test_map: Mapping[str, str]
):
# Only run this test on legacy versions
if is_0_release(release_test_map["user_code"]):
if release_test_map["user_code"] == EARLIEST_TESTED_RELEASE:
assert_runs_and_exists(graphql_client, "the_pipeline", subset_selection=["my_solid"])


Expand All @@ -280,7 +271,7 @@ def test_backcompat_deployed_job(graphql_client: DagsterGraphQLClient):


def test_backcompat_deployed_job_subset(graphql_client: DagsterGraphQLClient):
assert_runs_and_exists(graphql_client, "the_job", subset_selection=["my_op"])
assert_runs_and_exists(graphql_client, "the_job", subset_selection=["the_op"])


def test_backcompat_ping_webserver(graphql_client: DagsterGraphQLClient):
Expand All @@ -290,6 +281,107 @@ def test_backcompat_ping_webserver(graphql_client: DagsterGraphQLClient):
)


REPO_CONTENT_QUERY = """
query {
repositoryOrError(
repositorySelector: {
repositoryLocationName: "test_repo"
repositoryName: "__repository__"
}
) {
... on Repository {
name
assetNodes {
assetKey {
path
}
assetChecksOrError {
... on AssetChecks {
checks {
name
}
}
}
isPartitioned
requiredResources {
resourceKey
}
}
jobs {
name
}
schedules {
name
}
sensors {
name
}
allTopLevelResourceDetails {
name
}
}
}
}
"""


# The purpose of the test is primarily to catch typos introduced when renaming serializable classes.
# For example, if we were to rename `ScheduleSnap` to `FooScheduleSnap`, we would do this:
#
# @whitelist_for_serdes(storage_name="ScheduleSnap")
# class FooScheduleSnap:
# ...
#
# If there were a typo in the `storage_name` "ScheduleSnap" above, this test would fail since a
# serialized ScheduleSnap could not be successfully deserialized.
def test_backcompat_list_repo_contents(
graphql_client: DagsterGraphQLClient,
release_test_map: Mapping[str, str],
):
# Do not run this test on the earliest tested release, since the repo contents are different.
if release_test_map["user_code"] == EARLIEST_TESTED_RELEASE:
pytest.skip("Skipping test for earliest tested release-- repo contents are different")

res = graphql_client._execute( # noqa: SLF001
REPO_CONTENT_QUERY,
variables={},
)
assert res
repo_content = res["repositoryOrError"]

job_names = {job["name"] for job in repo_content["jobs"]}
assert "the_partitioned_job" in job_names
assert "the_job" in job_names
assert "test_graphql" in job_names

asset_keys = {AssetKey(asset["assetKey"]["path"]) for asset in repo_content["assetNodes"]}
assert AssetKey("the_asset") in asset_keys
the_static_partitioned_asset = _get_asset(
repo_content, AssetKey(["the_static_partitioned_asset"])
)
assert the_static_partitioned_asset["isPartitioned"]
the_time_partitioned_asset = _get_asset(repo_content, AssetKey(["the_time_partitioned_asset"]))
assert the_time_partitioned_asset["isPartitioned"]
the_resource_asset = _get_asset(repo_content, AssetKey(["the_resource_asset"]))
assert the_resource_asset["requiredResources"][0]["resourceKey"] == "the_resource"

assert repo_content["schedules"][0]["name"] == "the_schedule"
assert repo_content["sensors"][0]["name"] == "the_sensor"

# definitions only present for the modern repo used with the most recent release
if release_test_map["user_code"] == MOST_RECENT_RELEASE_PLACEHOLDER:
assert repo_content["allTopLevelResourceDetails"][0]["name"] == "the_resource"
assert "the_asset_job" in job_names
the_asset = _get_asset(repo_content, AssetKey(["the_asset"]))
assert the_asset["assetChecksOrError"][0]["checks"][0]["name"] == "the_asset_check"


def _get_asset(repo_content: Mapping[str, Any], asset_key: AssetKey) -> Mapping[str, Any]:
return next(
asset for asset in repo_content["assetNodes"] if asset["assetKey"]["path"] == asset_key.path
)


def assert_runs_and_exists(
client: DagsterGraphQLClient, name: str, subset_selection: Optional[Sequence[str]] = None
):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# type: ignore

# This file is only loaded by old versions of dagster during backcompat testing.
# Backcompat test definitions intended for use with our oldest testest release of Dagster. Does not
# use `Definitions` because it is not available in our oldest supported releases.

from dagster import graph, op, pipeline, repository, solid
from dagster_graphql import DagsterGraphQLClient
Expand All @@ -22,18 +23,21 @@ def the_pipeline():


@op
def my_op():
def the_op():
return 5


@op
def ingest(x):
def the_ingest_op(x):
return x + 5


@graph
def basic():
ingest(my_op())
def the_graph():
the_ingest_op(the_op())


the_job = the_graph.to_job(name="the_job")


@solid
Expand All @@ -50,9 +54,8 @@ def test_graphql():
ping_dagit()


the_job = basic.to_job(name="the_job")


# This is named __repository__ so that it has the same name as a RepositoryDefinition generated from
# a Definitions object.
@repository
def basic_repo():
return [the_job, the_pipeline, test_graphql]
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
# Backcompat test definitions intended for use with the latest release of Dagster. Should
# include as many unique kinds of Dagster definitions as possible.

from dagster import (
AssetCheckResult,
ConfigurableResource,
DailyPartitionsDefinition,
Definitions,
RunRequest,
StaticPartitionsDefinition,
asset,
asset_check,
define_asset_job,
graph,
job,
op,
schedule,
sensor,
)
from dagster_graphql import DagsterGraphQLClient


@op
def the_op():
return 5


@op
def the_ingest_op(x):
return x + 5


@op
def ping_dagster_webserver():
client = DagsterGraphQLClient(
"dagster_webserver",
port_number=3000,
)
return client._execute("{__typename}") # noqa: SLF001


@graph
def the_graph():
the_ingest_op(the_op())


@job
def test_graphql():
ping_dagster_webserver()


@asset
def the_asset():
return 5


static_partititions_def = StaticPartitionsDefinition(["a", "b"])


@asset(partitions_def=static_partititions_def)
def the_static_partitioned_asset():
return 5


@asset(partitions_def=DailyPartitionsDefinition(start_date="2022-01-01"))
def the_time_partitioned_asset():
return 5


the_job = the_graph.to_job(name="the_job")


@schedule(cron_schedule="* * * * *", job=the_job)
def the_schedule():
return RunRequest()


@sensor(job=the_job)
def the_sensor():
return RunRequest()


the_partitioned_job = the_graph.to_job(
name="the_partitioned_job", partitions_def=static_partititions_def
)


class TheResource(ConfigurableResource):
foo: str


@asset
def the_resource_asset(the_resource: TheResource):
return 5


the_asset_job = define_asset_job("the_asset_job", [the_asset])


@asset_check(asset=the_asset)
def the_asset_check():
return AssetCheckResult(passed=True)


defs = Definitions(
assets=[
the_asset,
the_static_partitioned_asset,
the_time_partitioned_asset,
the_resource_asset,
],
asset_checks=[the_asset_check],
jobs=[the_job, the_partitioned_job, the_asset_job, test_graphql],
schedules=[the_schedule],
sensors=[the_sensor],
resources={"the_resource": TheResource(foo="foo")},
)
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@ grpcio<1.48.1; python_version < '3.10'
sqlalchemy<2.0.0

# Added pendulum pin in later versions
pendulum<3
pendulum<3
Loading