-
Notifications
You must be signed in to change notification settings - Fork 4.1k
/
Copy pathpyspark-flex-etl-job.ts
83 lines (76 loc) · 2.82 KB
/
pyspark-flex-etl-job.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
import { Construct } from 'constructs';
import { JobType, GlueVersion, JobLanguage, PythonVersion, WorkerType, ExecutionClass } from '../constants';
import { Code } from '../code';
import { SparkJob, SparkJobProps } from './spark-job';
import { addConstructMetadata } from 'aws-cdk-lib/core/lib/metadata-resource';
import { Job } from './job';
/**
* Properties for PySparkFlexEtlJob
*/
export interface PySparkFlexEtlJobProps extends SparkJobProps {
/**
* Extra Python Files S3 URL (optional)
* S3 URL where additional python dependencies are located
*
* @default - no extra files
*/
readonly extraPythonFiles?: Code[];
}
/**
* Flex Jobs class
*
* Flex jobs supports Python and Scala language.
* The flexible execution class is appropriate for non-urgent jobs such as
* pre-production jobs, testing, and one-time data loads.
* Flexible job runs are supported for jobs using AWS Glue version 3.0 or later and G.1X or
* G.2X worker types but will default to the latest version of Glue (currently Glue 3.0.)
*
* Similar to ETL, we’ll enable these features: —enable-metrics, —enable-spark-ui,
* —enable-continuous-cloudwatch-log
*/
export class PySparkFlexEtlJob extends SparkJob {
public readonly jobArn: string;
public readonly jobName: string;
/**
* PySparkFlexEtlJob constructor
*/
constructor(scope: Construct, id: string, props: PySparkFlexEtlJobProps) {
super(scope, id, props);
// Enhanced CDK Analytics Telemetry
addConstructMetadata(this, props);
// Combine command line arguments into a single line item
const defaultArguments = {
...this.executableArguments(props),
...this.nonExecutableCommonArguments(props),
};
const jobResource = Job.setupJobResource(this, props, {
role: this.role!.roleArn,
command: {
name: JobType.ETL,
scriptLocation: this.codeS3ObjectUrl(props.script),
pythonVersion: PythonVersion.THREE,
},
glueVersion: props.glueVersion ? props.glueVersion : GlueVersion.V3_0,
workerType: props.workerType ? props.workerType : WorkerType.G_1X,
numberOfWorkers: props.numberOfWorkers ? props.numberOfWorkers : 10,
maxRetries: props.maxRetries,
executionClass: ExecutionClass.FLEX,
jobRunQueuingEnabled: false,
defaultArguments,
});
const resourceName = this.getResourceNameAttribute(jobResource.ref);
this.jobArn = Job.buildJobArn(this, resourceName);
this.jobName = resourceName;
}
/**
*Set the executable arguments with best practices enabled by default
*
* @returns An array of arguments for Glue to use on execution
*/
private executableArguments(props: PySparkFlexEtlJobProps) {
const args: { [key: string]: string } = {};
args['--job-language'] = JobLanguage.PYTHON;
this.setupExtraCodeArguments(args, props);
return args;
}
}