Skip to content

Commit 3fcb9ad

Browse files
committed
Merge branch 'master' of https://github.com/dagster-io/dagster into salazarm/status-selection-syntax
2 parents dbef2e2 + 7b24cc9 commit 3fcb9ad

16 files changed

+152
-32
lines changed

docs/docs/integrations/libraries/fivetran.md

+14
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,20 @@ Definitions from multiple Fivetran workspaces can be combined by instantiating m
127127
language="python"
128128
/>
129129

130+
### Define upstream dependencies
131+
132+
By default, Dagster does not set upstream dependencies when generating asset specs for your Fivetran assets. You can set upstream dependencies on your Fivetran assets by passing an instance of the custom <PyObject section="libraries" module="dagster_fivetran" object="DagsterFivetranTranslator" /> to the <PyObject section="libraries" module="dagster_fivetran" object="load_fivetran_asset_specs" /> function.
133+
134+
<CodeExample
135+
startAfter="start_upstream_asset"
136+
endBefore="end_upstream_asset"
137+
path="docs_snippets/docs_snippets/integrations/fivetran/define_upstream_dependencies.py"
138+
/>
139+
140+
Note that `super()` is called in each of the overridden methods to generate the default asset spec. It is best practice to generate the default asset spec before customizing it.
141+
142+
You can pass an instance of the custom <PyObject section="libraries" module="dagster_fivetran" object="DagsterFivetranTranslator" /> to the <PyObject section="libraries" module="dagster_fivetran" object="fivetran_assets" /> decorator or the <PyObject section="libraries" module="dagster_fivetran" object="build_fivetran_assets_definitions" /> factory.
143+
130144
### Define downstream dependencies
131145

132146
Dagster allows you to define assets that are downstream of specific Fivetran tables using their asset keys. The asset key for a Fivetran table can be retrieved using the asset definitions created using the <PyObject section="libraries" module="dagster_fivetran" object="fivetran_assets" /> decorator. The below example defines `my_downstream_asset` as a downstream dependency of `my_fivetran_table`:
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
from dagster_fivetran import (
2+
DagsterFivetranTranslator,
3+
FivetranConnectorTableProps,
4+
FivetranWorkspace,
5+
load_fivetran_asset_specs,
6+
)
7+
8+
import dagster as dg
9+
10+
fivetran_workspace = FivetranWorkspace(
11+
account_id=dg.EnvVar("FIVETRAN_ACCOUNT_ID"),
12+
api_key=dg.EnvVar("FIVETRAN_API_KEY"),
13+
api_secret=dg.EnvVar("FIVETRAN_API_SECRET"),
14+
)
15+
16+
17+
# start_upstream_asset
18+
class MyCustomFivetranTranslator(DagsterFivetranTranslator):
19+
def get_asset_spec(self, props: FivetranConnectorTableProps) -> dg.AssetSpec:
20+
# We create the default asset spec using super()
21+
default_spec = super().get_asset_spec(props)
22+
# We set an upstream dependency for our assets
23+
return default_spec.replace_attributes(deps=["my_upstream_asset_key"])
24+
25+
26+
fivetran_specs = load_fivetran_asset_specs(
27+
fivetran_workspace, dagster_fivetran_translator=MyCustomFivetranTranslator()
28+
)
29+
30+
# end_upstream_asset
31+
32+
defs = dg.Definitions(assets=fivetran_specs, resources={"fivetran": fivetran_workspace})

examples/docs_snippets/docs_snippets_tests/test_integration_files_load.py

+1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
f"{snippets_folder}/fivetran/customize_fivetran_asset_defs.py",
2323
f"{snippets_folder}/fivetran/customize_fivetran_translator_asset_spec.py",
2424
f"{snippets_folder}/fivetran/define_downstream_dependencies.py",
25+
f"{snippets_folder}/fivetran/define_upstream_dependencies.py",
2526
f"{snippets_folder}/fivetran/multiple_fivetran_workspaces.py",
2627
f"{snippets_folder}/fivetran/representing_fivetran_assets.py",
2728
f"{snippets_folder}/fivetran/select_fivetran_connectors.py",

js_modules/dagster-ui/packages/ui-core/src/assets/LaunchAssetChoosePartitionsDialog.tsx

+5-3
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ import {
6060
} from '../instance/backfill/types/BackfillUtils.types';
6161
import {fetchTagsAndConfigForAssetJob} from '../launchpad/ConfigFetch';
6262
import {TagContainer, TagEditor} from '../launchpad/TagEditor';
63+
import {tagsWithUIExecutionTags} from '../launchpad/uiExecutionTags';
6364
import {
6465
DAEMON_NOT_RUNNING_ALERT_INSTANCE_FRAGMENT,
6566
DaemonNotRunningAlert,
@@ -311,10 +312,11 @@ const LaunchAssetChoosePartitionsDialogBody = ({
311312
};
312313

313314
const onLaunchAsBackfill = async () => {
315+
const backfillTags = tagsWithUIExecutionTags(tags);
314316
const backfillParams: LaunchBackfillParams =
315317
target.type === 'job' && !isHiddenAssetGroupJob(target.jobName)
316318
? {
317-
tags,
319+
tags: backfillTags,
318320
assetSelection: assets.map(asAssetKeyInput),
319321
partitionNames: keysFiltered,
320322
fromFailure: false,
@@ -329,12 +331,12 @@ const LaunchAssetChoosePartitionsDialogBody = ({
329331
}
330332
: target.type === 'pureAll'
331333
? {
332-
tags,
334+
tags: backfillTags,
333335
assetSelection: assets.map(asAssetKeyInput),
334336
allPartitions: true,
335337
}
336338
: {
337-
tags,
339+
tags: backfillTags,
338340
assetSelection: assets.map(asAssetKeyInput),
339341
partitionNames: keysFiltered,
340342
fromFailure: false,

js_modules/dagster-ui/packages/ui-core/src/assets/__tests__/LaunchAssetExecutionButton.test.tsx

+16-11
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import {CustomAlertProvider} from '../../app/CustomAlertProvider';
77
import {CustomConfirmationProvider} from '../../app/CustomConfirmationProvider';
88
import {displayNameForAssetKey} from '../../asset-graph/Utils';
99
import {LaunchPartitionBackfillMutation} from '../../instance/backfill/types/BackfillUtils.types';
10+
import {UI_EXECUTION_TAGS} from '../../launchpad/uiExecutionTags';
1011
import {LaunchPipelineExecutionMutation} from '../../runs/types/RunUtils.types';
1112
import {TestProvider} from '../../testing/TestProvider';
1213
import {buildWorkspaceMocks} from '../../workspace/WorkspaceContext/__fixtures__/Workspace.fixtures';
@@ -190,7 +191,7 @@ describe('LaunchAssetExecutionButton', () => {
190191
const launchMock = buildExpectedLaunchSingleRunMutation({
191192
mode: 'default',
192193
executionMetadata: {
193-
tags: [],
194+
tags: [...UI_EXECUTION_TAGS],
194195
},
195196
runConfigData: '{}',
196197
selector: {
@@ -214,7 +215,7 @@ describe('LaunchAssetExecutionButton', () => {
214215
it('should not include checks if the job in context is marked without_checks', async () => {
215216
const launchMock = buildExpectedLaunchSingleRunMutation({
216217
mode: 'default',
217-
executionMetadata: {tags: []},
218+
executionMetadata: {tags: [...UI_EXECUTION_TAGS]},
218219
runConfigData: '{}',
219220
selector: {
220221
repositoryLocationName: 'test.py',
@@ -236,7 +237,7 @@ describe('LaunchAssetExecutionButton', () => {
236237
it('should include checks if the job in context includes them', async () => {
237238
const launchMock = buildExpectedLaunchSingleRunMutation({
238239
mode: 'default',
239-
executionMetadata: {tags: []},
240+
executionMetadata: {tags: [...UI_EXECUTION_TAGS]},
240241
runConfigData: '{}',
241242
selector: {
242243
repositoryLocationName: 'test.py',
@@ -275,7 +276,7 @@ describe('LaunchAssetExecutionButton', () => {
275276
const launchMock = buildExpectedLaunchSingleRunMutation({
276277
mode: 'default',
277278
executionMetadata: {
278-
tags: [],
279+
tags: [...UI_EXECUTION_TAGS],
279280
},
280281
runConfigData: '{}',
281282
selector: {
@@ -327,6 +328,7 @@ describe('LaunchAssetExecutionButton', () => {
327328
tags: [
328329
{key: 'dagster/partition', value: '2023-02-22'},
329330
{key: 'dagster/partition_set', value: 'my_asset_job_partition_set'},
331+
...UI_EXECUTION_TAGS,
330332
],
331333
},
332334
mode: 'default',
@@ -360,7 +362,7 @@ describe('LaunchAssetExecutionButton', () => {
360362
assetSelection: [{path: ['asset_daily']}],
361363
partitionNames: ASSET_DAILY_PARTITION_KEYS,
362364
fromFailure: false,
363-
tags: [],
365+
tags: [...UI_EXECUTION_TAGS],
364366
});
365367
renderButton({
366368
scope: {all: [ASSET_DAILY]},
@@ -400,7 +402,7 @@ describe('LaunchAssetExecutionButton', () => {
400402
assetSelection: [{path: ['asset_daily']}],
401403
partitionNames: ASSET_DAILY_PARTITION_KEYS_MISSING,
402404
fromFailure: false,
403-
tags: [],
405+
tags: [...UI_EXECUTION_TAGS],
404406
});
405407
renderButton({
406408
scope: {all: [ASSET_DAILY]},
@@ -437,6 +439,7 @@ describe('LaunchAssetExecutionButton', () => {
437439
tags: [
438440
{key: 'dagster/partition', value: '2023-02-22'},
439441
{key: 'dagster/partition_set', value: '__ASSET_JOB_partition_set'},
442+
...UI_EXECUTION_TAGS,
440443
],
441444
},
442445
mode: 'default',
@@ -472,7 +475,7 @@ describe('LaunchAssetExecutionButton', () => {
472475
assetSelection: [{path: ['asset_daily']}],
473476
partitionNames: ASSET_DAILY_PARTITION_KEYS,
474477
fromFailure: false,
475-
tags: [],
478+
tags: [...UI_EXECUTION_TAGS],
476479
});
477480
renderButton({
478481
scope: {all: [ASSET_DAILY]},
@@ -503,6 +506,7 @@ describe('LaunchAssetExecutionButton', () => {
503506
tags: [
504507
{key: 'dagster/asset_partition_range_start', value: '2020-01-02'},
505508
{key: 'dagster/asset_partition_range_end', value: '2023-02-22'},
509+
...UI_EXECUTION_TAGS,
506510
],
507511
},
508512
runConfigData: '{}\n',
@@ -540,6 +544,7 @@ describe('LaunchAssetExecutionButton', () => {
540544
tags: [
541545
{key: 'dagster/asset_partition_range_start', value: '2020-01-02'},
542546
{key: 'dagster/asset_partition_range_end', value: '2023-02-22'},
547+
...UI_EXECUTION_TAGS,
543548
],
544549
},
545550
runConfigData: '{}\n',
@@ -572,7 +577,7 @@ describe('LaunchAssetExecutionButton', () => {
572577
assetSelection: [{path: ['asset_daily']}, {path: ['asset_weekly']}],
573578
partitionNames: ASSET_DAILY_PARTITION_KEYS,
574579
fromFailure: false,
575-
tags: [],
580+
tags: [...UI_EXECUTION_TAGS],
576581
});
577582

578583
renderButton({
@@ -604,7 +609,7 @@ describe('LaunchAssetExecutionButton', () => {
604609
assetSelection: [{path: ['asset_daily']}, {path: ['asset_weekly']}],
605610
partitionNames: ASSET_DAILY_PARTITION_KEYS,
606611
fromFailure: false,
607-
tags: [],
612+
tags: [...UI_EXECUTION_TAGS],
608613
});
609614

610615
renderButton({
@@ -624,7 +629,7 @@ describe('LaunchAssetExecutionButton', () => {
624629

625630
it('should offer to materialize all partitions if roots have different partition defintions ("pureAll" case)', async () => {
626631
const LaunchPureAllMutationMock = buildExpectedLaunchBackfillMutation({
627-
tags: [],
632+
tags: [...UI_EXECUTION_TAGS],
628633
assetSelection: [
629634
{path: ['asset_daily']},
630635
{path: ['asset_weekly']},
@@ -658,7 +663,7 @@ describe('LaunchAssetExecutionButton', () => {
658663
it('should present a warning if pre-flight check indicates other asset keys are required', async () => {
659664
const launchMock = buildExpectedLaunchSingleRunMutation({
660665
mode: 'default',
661-
executionMetadata: {tags: []},
666+
executionMetadata: {tags: [...UI_EXECUTION_TAGS]},
662667
runConfigData: '{}',
663668
selector: {
664669
repositoryLocationName: 'test.py',
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
import {ExecutionParams, ExecutionTag} from '../graphql/types';
2+
import {DagsterTag} from '../runs/RunTag';
3+
4+
export const UI_EXECUTION_TAGS: ExecutionTag[] = [{key: DagsterTag.FromUI, value: 'true'}];
5+
6+
export function tagsWithUIExecutionTags(tags: ExecutionTag[] | null | undefined) {
7+
return [
8+
...(tags || []).filter((t) => !UI_EXECUTION_TAGS.some((a) => a.key === t.key)),
9+
...UI_EXECUTION_TAGS,
10+
];
11+
}
12+
13+
export function paramsWithUIExecutionTags(params: ExecutionParams) {
14+
return {
15+
...params,
16+
executionMetadata: {
17+
...params.executionMetadata,
18+
tags: tagsWithUIExecutionTags(params.executionMetadata?.tags),
19+
},
20+
};
21+
}

js_modules/dagster-ui/packages/ui-core/src/launchpad/useLaunchMultipleRunsWithTelemetry.ts

+10-1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import {useHistory} from 'react-router-dom';
33

44
import {showLaunchError} from './showLaunchError';
55
import {useMutation} from '../apollo-client';
6+
import {paramsWithUIExecutionTags} from './uiExecutionTags';
67
import {TelemetryAction, useTelemetryAction} from '../app/Telemetry';
78
import {
89
LAUNCH_MULTIPLE_RUNS_MUTATION,
@@ -49,7 +50,15 @@ export function useLaunchMultipleRunsWithTelemetry() {
4950
opSelection: undefined,
5051
};
5152

52-
const result = (await launchMultipleRuns({variables})).data?.launchMultipleRuns;
53+
const finalized = {
54+
...variables,
55+
executionParamsList: Array.isArray(variables.executionParamsList)
56+
? variables.executionParamsList.map(paramsWithUIExecutionTags)
57+
: paramsWithUIExecutionTags(variables.executionParamsList),
58+
};
59+
60+
const result = (await launchMultipleRuns({variables: finalized})).data?.launchMultipleRuns;
61+
5362
if (result) {
5463
handleLaunchMultipleResult(result, history, {behavior});
5564
logTelemetry(

js_modules/dagster-ui/packages/ui-core/src/launchpad/useLaunchWithTelemetry.oss.ts

+9-6
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import {useHistory} from 'react-router-dom';
33

44
import {showLaunchError} from './showLaunchError';
55
import {useMutation} from '../apollo-client';
6+
import {paramsWithUIExecutionTags} from './uiExecutionTags';
67
import {TelemetryAction, useTelemetryAction} from '../app/Telemetry';
78
import {
89
LAUNCH_PIPELINE_EXECUTION_MUTATION,
@@ -25,21 +26,23 @@ export function useLaunchWithTelemetry() {
2526
const history = useHistory();
2627

2728
return useCallback(
28-
async (variables: LaunchPipelineExecutionMutationVariables, behavior: LaunchBehavior) => {
29-
const jobName =
30-
variables.executionParams.selector.jobName ||
31-
variables.executionParams.selector.pipelineName;
29+
async (
30+
{executionParams, ...rest}: LaunchPipelineExecutionMutationVariables,
31+
behavior: LaunchBehavior,
32+
) => {
33+
const jobName = executionParams.selector.jobName || executionParams.selector.pipelineName;
3234

3335
if (!jobName) {
3436
return;
3537
}
3638

3739
const metadata: {[key: string]: string | null | undefined} = {
3840
jobName,
39-
opSelection: variables.executionParams.selector.solidSelection ? 'provided' : undefined,
41+
opSelection: executionParams.selector.solidSelection ? 'provided' : undefined,
4042
};
4143

42-
const result = await launchPipelineExecution({variables});
44+
const finalized = {executionParams: paramsWithUIExecutionTags(executionParams), ...rest};
45+
const result = await launchPipelineExecution({variables: finalized});
4346
logTelemetry(TelemetryAction.LAUNCH_RUN, metadata);
4447
try {
4548
handleLaunchResult(jobName, result.data?.launchPipelineExecution, history, {behavior});

js_modules/dagster-ui/packages/ui-core/src/partitions/BackfillSelector.tsx

+2-1
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ import {
4343
} from '../instance/backfill/types/BackfillUtils.types';
4444
import {LaunchButton} from '../launchpad/LaunchButton';
4545
import {TagContainer, TagEditor} from '../launchpad/TagEditor';
46+
import {tagsWithUIExecutionTags} from '../launchpad/uiExecutionTags';
4647
import {explodeCompositesInHandleGraph} from '../pipelines/CompositeSupport';
4748
import {GRAPH_EXPLORER_SOLID_HANDLE_FRAGMENT} from '../pipelines/GraphExplorer';
4849
import {GraphQueryInput} from '../ui/GraphQueryInput';
@@ -388,7 +389,7 @@ const LaunchBackfillButton = ({
388389
partitionNames,
389390
reexecutionSteps,
390391
fromFailure,
391-
tags,
392+
tags: tagsWithUIExecutionTags(tags),
392393
},
393394
},
394395
});

js_modules/dagster-ui/packages/ui-core/src/runs/ReexecutionDialog.tsx

+2-1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import {
2323
LaunchPipelineReexecutionMutationVariables,
2424
} from './types/RunUtils.types';
2525
import {ExecutionTag, ReexecutionStrategy} from '../graphql/types';
26+
import {tagsWithUIExecutionTags} from '../launchpad/uiExecutionTags';
2627

2728
export interface ReexecutionDialogProps {
2829
isOpen: boolean;
@@ -179,7 +180,7 @@ export const ReexecutionDialog = (props: ReexecutionDialogProps) => {
179180
dispatch({type: 'start'});
180181

181182
const runList = Object.keys(state.frozenRuns);
182-
const extraTags = extraTagsValidated.toSave.length ? extraTagsValidated.toSave : undefined;
183+
const extraTags = tagsWithUIExecutionTags(extraTagsValidated.toSave);
183184

184185
for (const runId of runList) {
185186
const {data} = await reexecute({

js_modules/dagster-ui/packages/ui-core/src/runs/RunTag.tsx

+1
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ export enum DagsterTag {
3333
AssetEvaluationID = 'dagster/asset_evaluation_id',
3434
SnapshotID = 'dagster/snapshot_id', // This only exists on the client, not the server.
3535
ReportingUser = 'dagster/reporting_user',
36+
FromUI = 'dagster/from_ui',
3637
User = 'user',
3738

3839
// Hidden tags (using ".dagster" HIDDEN_TAG_PREFIX)

0 commit comments

Comments
 (0)