Skip to content

Commit 6e48e07

Browse files
committed
[dagster-aws] [docs] add docs for PipesEMRServerlessClient
1 parent dc7262f commit 6e48e07

File tree

10 files changed

+386
-0
lines changed

10 files changed

+386
-0
lines changed

docs/content/api/modules.json.gz

2.5 KB
Binary file not shown.

docs/content/api/searchindex.json.gz

24 Bytes
Binary file not shown.

docs/content/api/sections.json.gz

789 Bytes
Binary file not shown.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
---
2+
title: "Integrating AWS EMR Serverless with Dagster Pipes | Dagster Docs"
3+
description: "Learn to integrate Dagster Pipes with AWS EMR Serverless to launch external code from Dagster assets."
4+
---
5+
6+
# AWS EMR Serverless & Dagster Pipes
7+
8+
This tutorial gives a short overview on how to use [Dagster Pipes](/concepts/dagster-pipes) with [AWS EMR Serverless](https://aws.amazon.com/emr-serverless/).
9+
10+
The [dagster-aws](/\_apidocs/libraries/dagster-aws) integration library provides the <PyObject object="PipesEMRServerlessClient" module="dagster_aws.pipes" /> resource, which can be used to launch AWS EMR Serverless jobs from Dagster assets and ops. Dagster can receive regular events such as logs, asset checks, or asset materializations from jobs launched with this client. Using it requires minimal code changes to your EMR jobs.
11+
12+
---
13+
14+
## Prerequisites
15+
16+
- **In the orchestration environment**, you'll need to:
17+
18+
- Install the following packages:
19+
20+
```shell
21+
pip install dagster dagster-webserver dagster-aws
22+
```
23+
24+
Refer to the [Dagster installation guide](/getting-started/install) for more info.
25+
26+
- **AWS authentication credentials configured.** If you don't have this set up already, refer to the [boto3 quickstart](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/quickstart.html).
27+
28+
- **In AWS**:
29+
30+
- An existing AWS account
31+
- An AWS EMR Serverless job. AWS CloudWatch logging has to be enabled in order to receive logs from the job:
32+
33+
```json
34+
{
35+
"monitoringConfiguration": {
36+
"cloudWatchLoggingConfiguration": { "enabled": true }
37+
}
38+
}
39+
```
40+
41+
---
42+
43+
## Step 1: Install the dagster-pipes module
44+
45+
There are a [few options](https://docs.aws.amazon.com/emr/latest/EMR-Serverless-UserGuide/using-python-libraries.html) available for shipping Python packages to a PySpark job. For example, [install it in your Docker image](https://docs.aws.amazon.com/emr/latest/EMR-Serverless-UserGuide/application-custom-image.html):
46+
47+
Install the `dagster-pipes` module in the image used for your EMR job. For example, you can install the dependency with `pip` in your image Dockerfile:
48+
49+
```Dockerfile
50+
# start from EMR image
51+
FROM public.ecr.aws/emr-serverless/spark/emr-7.2.0:latest
52+
53+
USER root
54+
55+
RUN python -m pip install dagster-pipes
56+
57+
# copy the job script
58+
COPY . .
59+
60+
USER hadoop
61+
```
62+
63+
---
64+
65+
## Step 2: Add dagster-pipes to the EMR Serverless job script
66+
67+
Call `open_dagster_pipes` in the EMR Serverless script to create a context that can be used to send messages to Dagster:
68+
69+
```python file=/guides/dagster/dagster_pipes/emr-serverless/script.py
70+
from dagster_pipes import open_dagster_pipes
71+
from pyspark.sql import SparkSession
72+
73+
74+
def main():
75+
with open_dagster_pipes() as pipes:
76+
pipes.log.info("Hello from AWS EMR Serverless!")
77+
78+
spark = SparkSession.builder.appName("HelloWorld").getOrCreate()
79+
80+
df = spark.createDataFrame(
81+
[(1, "Alice", 34), (2, "Bob", 45), (3, "Charlie", 56)],
82+
["id", "name", "age"],
83+
)
84+
85+
# calculate a really important statistic
86+
avg_age = float(df.agg({"age": "avg"}).collect()[0][0])
87+
88+
# attach it to the asset materialization in Dagster
89+
pipes.report_asset_materialization(
90+
metadata={"average_age": {"raw_value": avg_age, "type": "float"}},
91+
data_version="alpha",
92+
)
93+
94+
spark.stop()
95+
96+
97+
if __name__ == "__main__":
98+
main()
99+
```
100+
101+
---
102+
103+
## Step 3: Create an asset using the PipesEMRServerlessClient to launch the job
104+
105+
In the Dagster asset/op code, use the `PipesEMRServerlessClient` resource to launch the job:
106+
107+
```python file=/guides/dagster/dagster_pipes/emr-serverless/dagster_code.py startafter=start_asset_marker endbefore=end_asset_marker
108+
import os
109+
110+
import boto3
111+
from dagster_aws.pipes import PipesEMRServerlessClient
112+
113+
from dagster import AssetExecutionContext, asset
114+
115+
116+
@asset
117+
def emr_serverless_asset(
118+
context: AssetExecutionContext,
119+
pipes_emr_serverless_client: PipesEMRServerlessClient,
120+
):
121+
return pipes_emr_serverless_client.run(
122+
context=context,
123+
start_job_run_params={
124+
"applicationId": "<app-id>",
125+
"executionRoleArn": "<emr-role>",
126+
"clientToken": context.run_id, # idempotency identifier for the job run
127+
"configurationOverrides": {
128+
"monitoringConfiguration": {
129+
"cloudWatchLoggingConfiguration": {"enabled": True}
130+
}
131+
},
132+
},
133+
).get_results()
134+
```
135+
136+
This will launch the AWS EMR Serverless job and wait for it completion. If the job fails, the Dagster process will raise an exception. If the Dagster process is interrupted while the job is still running, the job will be terminated.
137+
138+
---
139+
140+
## Step 4: Create Dagster definitions
141+
142+
Next, add the `PipesEMRServerlessClient` resource to your project's <PyObject object="Definitions" /> object:
143+
144+
```python file=/guides/dagster/dagster_pipes/emr-serverless/dagster_code.py startafter=start_definitions_marker endbefore=end_definitions_marker
145+
from dagster import Definitions # noqa
146+
147+
148+
defs = Definitions(
149+
assets=[emr_serverless_asset],
150+
resources={"pipes_emr_serverless_client": PipesEMRServerlessClient()},
151+
)
152+
```
153+
154+
Dagster will now be able to launch the AWS EMR Serverless task from the `emr_serverless_asset` asset, and receive logs and events from the job. If using the default `message_reader` `PipesCloudwatchLogReader`, driver logs will be forwarded to the Dagster process.
155+
156+
---
157+
158+
## Related
159+
160+
<ArticleList>
161+
<ArticleListItem
162+
title="Dagster Pipes"
163+
href="/concepts/dagster-pipes"
164+
></ArticleListItem>
165+
<ArticleListItem
166+
title="AWS EMR Serverless Pipes API reference"
167+
href="/_apidocs/libraries/dagster-aws#dagster_aws.pipes.PipesEMRServerlessClient"
168+
></ArticleListItem>
169+
</ArticleList>

docs/next/public/objects.inv

6 Bytes
Binary file not shown.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
FROM public.ecr.aws/emr-serverless/spark/emr-7.2.0
2+
3+
USER root
4+
5+
COPY --from=ghcr.io/astral-sh/uv:0.4.7 /uv /bin/uv
6+
7+
ENV UV_SYSTEM_PYTHON=1 \
8+
UV_BREAK_SYSTEM_PACKAGES=true \
9+
UV_COMPILE_BYTECODE=1 \
10+
UV_PYTHON=/usr/bin/python
11+
12+
WORKDIR /app
13+
14+
COPY python_modules/dagster-pipes ./dagster-pipes
15+
16+
RUN uv pip install ./dagster-pipes
17+
18+
# EMR Serverless will run the image as hadoop
19+
USER hadoop:hadoop
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
# start_asset_marker
2+
import os
3+
4+
import boto3
5+
from dagster_aws.pipes import PipesEMRServerlessClient
6+
7+
from dagster import AssetExecutionContext, asset
8+
9+
10+
@asset
11+
def emr_serverless_asset(
12+
context: AssetExecutionContext,
13+
pipes_emr_serverless_client: PipesEMRServerlessClient,
14+
):
15+
return pipes_emr_serverless_client.run(
16+
context=context,
17+
start_job_run_params={
18+
"applicationId": "<app-id>",
19+
"executionRoleArn": "<emr-role>",
20+
"clientToken": context.run_id, # idempotency identifier for the job run
21+
"configurationOverrides": {
22+
"monitoringConfiguration": {
23+
"cloudWatchLoggingConfiguration": {"enabled": True}
24+
}
25+
},
26+
},
27+
).get_results()
28+
29+
30+
# end_asset_marker
31+
32+
# start_definitions_marker
33+
34+
from dagster import Definitions # noqa
35+
36+
37+
defs = Definitions(
38+
assets=[emr_serverless_asset],
39+
resources={"pipes_emr_serverless_client": PipesEMRServerlessClient()},
40+
)
41+
42+
# end_definitions_marker
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
import os
2+
import sys
3+
4+
import boto3
5+
import pyspark.sql.functions as F
6+
from dagster_pipes import (
7+
PipesCliArgsParamsLoader,
8+
PipesS3ContextLoader,
9+
open_dagster_pipes,
10+
)
11+
from pyspark.sql import SparkSession
12+
13+
14+
def main():
15+
spark = SparkSession.builder.appName("WordCount").getOrCreate()
16+
17+
output_path = None
18+
19+
if len(sys.argv) > 1:
20+
output_path = sys.argv[1]
21+
else:
22+
print( # noqa
23+
"S3 output location not specified printing top 10 results to output stream"
24+
)
25+
26+
region = os.getenv("AWS_REGION")
27+
text_file = spark.sparkContext.textFile(
28+
"s3://" + region + ".elasticmapreduce/emr-containers/samples/wordcount/input"
29+
)
30+
counts = (
31+
text_file.flatMap(lambda line: line.split(" "))
32+
.map(lambda word: (word, 1))
33+
.reduceByKey(lambda a, b: a + b)
34+
.sortBy(lambda x: x[1], False)
35+
)
36+
counts_df = counts.toDF(["word", "count"])
37+
38+
if output_path:
39+
counts_df.write.mode("overwrite").csv(output_path)
40+
print( # noqa
41+
"WordCount job completed successfully. Refer output at S3 path: "
42+
+ output_path
43+
)
44+
else:
45+
counts_df.show(10, False)
46+
print("WordCount job completed successfully.") # noqa
47+
48+
spark.stop()
49+
50+
51+
if __name__ == "__main__":
52+
"""
53+
Usage: wordcount [destination path]
54+
"""
55+
56+
with open_dagster_pipes() as pipes:
57+
pipes.log.info("Hello from AWS EMR Serverless job!")
58+
pipes.report_asset_materialization(
59+
metadata={"some_metric": {"raw_value": 0, "type": "int"}},
60+
data_version="alpha",
61+
)
62+
main()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
from dagster_pipes import open_dagster_pipes
2+
from pyspark.sql import SparkSession
3+
4+
5+
def main():
6+
with open_dagster_pipes() as pipes:
7+
pipes.log.info("Hello from AWS EMR Serverless!")
8+
9+
spark = SparkSession.builder.appName("HelloWorld").getOrCreate()
10+
11+
df = spark.createDataFrame(
12+
[(1, "Alice", 34), (2, "Bob", 45), (3, "Charlie", 56)],
13+
["id", "name", "age"],
14+
)
15+
16+
# calculate a really important statistic
17+
avg_age = float(df.agg({"age": "avg"}).collect()[0][0])
18+
19+
# attach it to the asset materialization in Dagster
20+
pipes.report_asset_materialization(
21+
metadata={"average_age": {"raw_value": avg_age, "type": "float"}},
22+
data_version="alpha",
23+
)
24+
25+
spark.stop()
26+
27+
28+
if __name__ == "__main__":
29+
main()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
# this script can be used to pack and upload a python virtualenv to an s3 bucket
2+
# requires `uv` and `tar`
3+
4+
import argparse
5+
import os
6+
import subprocess
7+
import sys
8+
import tempfile
9+
from pathlib import Path
10+
11+
SCRIPT_DIR = Path(__file__).parent
12+
REQUIREMENTS_TXT = SCRIPT_DIR / "requirements.txt"
13+
DAGSTER_DIR = Path(*SCRIPT_DIR.parts[: SCRIPT_DIR.parts.index("examples")])
14+
15+
DAGSTER_PIPES_DIR = DAGSTER_DIR / "python_modules/dagster-pipes"
16+
17+
parser = argparse.ArgumentParser(description="Upload a python virtualenv to an s3 path")
18+
parser.add_argument(
19+
"--python", type=str, help="python version to use", default="3.10.8"
20+
)
21+
parser.add_argument(
22+
"--requirements",
23+
type=str,
24+
help="path to the requirements.txt file",
25+
default=str(REQUIREMENTS_TXT),
26+
)
27+
parser.add_argument("--s3-path", type=str, help="s3 path to copy to", required=True)
28+
29+
30+
def main():
31+
args = parser.parse_args()
32+
33+
with tempfile.TemporaryDirectory() as temp_dir:
34+
os.chdir(temp_dir)
35+
subprocess.run(
36+
f"uv python install --python-preference only-managed {args.python}",
37+
shell=True,
38+
check=True,
39+
)
40+
subprocess.run(
41+
f"uv venv --seed --relocatable --python-preference only-managed --python {args.python}",
42+
shell=True,
43+
check=True,
44+
)
45+
os.environ["VIRTUAL_ENV"] = str(Path(temp_dir) / ".venv")
46+
subprocess.run("source ./.venv/bin/activate", shell=True, check=True)
47+
subprocess.run(
48+
f"uv pip install --link-mode clone {DAGSTER_PIPES_DIR} ",
49+
shell=True,
50+
check=True,
51+
)
52+
subprocess.run(
53+
"tar -czf pyspark_venv.tar.gz -C .venv .",
54+
shell=True,
55+
check=True,
56+
)
57+
subprocess.run(
58+
f"aws s3 cp {temp_dir}/pyspark_venv.tar.gz {args.s3_path}",
59+
shell=True,
60+
check=True,
61+
)
62+
63+
64+
if __name__ == "__main__":
65+
main()

0 commit comments

Comments
 (0)