Skip to content

Commit 6ce664e

Browse files
authored
Merge pull request #10 from aelzeiny/submit-job-kwargs
Add more extensibility to AWS Batch
2 parents 5cfd688 + 320fe08 commit 6ce664e

File tree

3 files changed

+67
-17
lines changed

3 files changed

+67
-17
lines changed

.travis.yml

+7
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,15 @@
1+
dist: xenial
12
language: python
23
python:
34
- "3.6" # current default Python on Travis CI
45
- "3.7"
56
- "3.8"
7+
before_install:
8+
- sudo apt-get autoremove sqlite3
9+
- sudo apt-get install python-software-properties
10+
- sudo apt-add-repository -y ppa:linuxgndu/sqlite-nightly
11+
- sudo apt-get -y update
12+
- sudo apt-cache show sqlite3
613
install:
714
- pip install apache-airflow boto3 pylint isort marshmallow
815
env:

airflow_aws_executors/batch_executor.py

+28-6
Original file line numberDiff line numberDiff line change
@@ -125,17 +125,20 @@ def execute_async(self, key: TaskInstanceKeyType, command: CommandType, queue=No
125125
"""
126126
if executor_config and 'command' in executor_config:
127127
raise ValueError('Executor Config should never override "command"')
128-
job_id = self._submit_job(command, executor_config or {})
128+
job_id = self._submit_job(key, command, queue, executor_config or {})
129129
self.active_workers.add_job(job_id, key)
130130

131-
def _submit_job(self, cmd: CommandType, exec_config: ExecutorConfigType) -> str:
131+
def _submit_job(
132+
self,
133+
key: TaskInstanceKeyType,
134+
cmd: CommandType, queue: str,
135+
exec_config: ExecutorConfigType
136+
) -> str:
132137
"""
133138
The command and executor config will be placed in the container-override section of the JSON request, before
134-
calling Boto3's "run_task" function.
139+
calling Boto3's "submit_job" function.
135140
"""
136-
submit_job_api = deepcopy(self.submit_job_kwargs)
137-
submit_job_api['containerOverrides'].update(exec_config)
138-
submit_job_api['containerOverrides']['command'] = cmd
141+
submit_job_api = self._submit_job_kwargs(key, cmd, queue, exec_config)
139142
boto_run_task = self.batch.submit_job(**submit_job_api)
140143
try:
141144
submit_job_response = BatchSubmitJobResponseSchema().load(boto_run_task)
@@ -149,6 +152,25 @@ def _submit_job(self, cmd: CommandType, exec_config: ExecutorConfigType) -> str:
149152
)
150153
return submit_job_response['job_id']
151154

155+
def _submit_job_kwargs(
156+
self,
157+
key: TaskInstanceKeyType,
158+
cmd: CommandType,
159+
queue: str, exec_config: ExecutorConfigType
160+
) -> dict:
161+
"""
162+
This modifies the standard kwargs to be specific to this task by overriding the airflow command and updating
163+
the container overrides.
164+
165+
One last chance to modify Boto3's "submit_job" kwarg params before it gets passed into the Boto3 client.
166+
For the latest kwarg parameters:
167+
.. seealso:: https://docs.aws.amazon.com/batch/latest/APIReference/API_SubmitJob.html
168+
"""
169+
submit_job_api = deepcopy(self.submit_job_kwargs)
170+
submit_job_api['containerOverrides'].update(exec_config)
171+
submit_job_api['containerOverrides']['command'] = cmd
172+
return submit_job_api
173+
152174
def end(self, heartbeat_interval=10):
153175
"""
154176
Waits for all currently running tasks to end, and doesn't launch any tasks

readme.md

+32-11
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ For `AWS ECS/Fargate`: [Getting Started with AWS ECS/Fargate ReadMe](getting_sta
1414

1515

1616
## How Airflow Executors Work
17-
Every time Apache Airflow wants to run a task, the Scheduler generates a CLI command that needs to be executed **somewhere**.
17+
Every time Apache Airflow wants to run a task, the Scheduler generates a shell command that needs to be executed **somewhere**.
1818
Under the hood this command will run Python code, and it looks something like this:
1919
```bash
2020
airflow run <DAG_ID> <TASK_ID> <EXECUTION_DATE>
@@ -61,28 +61,29 @@ The Celery Backend and worker queue also need attention and maintenance. I've tr
6161
triggering CloudWatch Events, triggering capacity providers, triggering Application Autoscaling groups,
6262
and it was a mess that I never got to work properly.
6363

64-
#### The Case for AWS Fargate
64+
#### The Case for AWS Batch on AWS Fargate, and AWS Fargate
6565
If you're on the Fargate executor it may take ~2.5 minutes for a task to pop up, but at least it's a constant O(1) time.
6666
This way, the concept of tracking DAG Landing Times becomes unnecessary.
6767
If you have more than 2000 concurrent tasks (which is a lot) then you can always contact AWS to provide an increase in this soft-limit.
6868

6969

7070
## AWS Batch v AWS ECS v AWS Fargate?
71+
**I almost always recommend that you go the AWS Batch route**. Especially since, as of Dec 2020, AWS Batch supports Fargate deployments. So unless you need some very custom flexibility provided by ECS, or have a particular reason to use AWS Fargate directly, then go with AWS Batch.
72+
7173
`AWS Batch` - Is built on top of ECS, but has additional features for Batch-Job management. Including auto-scaling up and down servers on an ECS cluster based on jobs submitted to a queue. Generally easier to configure and setup than either option.
7274

7375
`AWS Fargate` - Is a serverless container orchestration service; comparable to a proprietary AWS version of Kubernetes. Launching a Fargate Task is like saying "I want these containers to be launched somewhere in the cloud with X CPU and Y memory, and I don't care about the server". AWS Fargate is built on top of AWS ECS, and is easier to manage and maintain. However, it provides less flexibility.
7476

7577
`AWS ECS` - Is known as "Elastic Container Service", which is a container orchestration service that uses a designated cluster of EC2 instances that you operate, own, and maintain.
7678

77-
I almost always recommend that you go the AWS Batch or AWS Fargate route unless you need some very custom flexibility provided by ECS.
7879

7980
| | Batch | Fargate | ECS |
8081
|-------------------|-------------------------------------------------------------------------------------|---------------------------------------------|---------------------------------------------------|
81-
| Start-up per task | Instantaneous 3s, if capacity available; otherwise 2-3 minutes to launch new server | 2-3 minutes per task; O(1) constant time | Instant 3s, or until capacity is available. |
82-
| Maintenance | You patch the own, operate, and patch the servers | Serverless | You patch the own, operate, and patch the servers |
82+
| Start-up per task | Combines both, depending on if the job queue is Fargate serverless | 2-3 minutes per task; O(1) constant time | Instant 3s, or until capacity is available. |
83+
| Maintenance | You patch the own, operate, and patch the servers OR Serverless (as of Dec 2020) | Serverless | You patch the own, operate, and patch the servers |
8384
| Capacity | Autoscales to configurable Max vCPUs in compute environment | ~2000 containers. See AWS Limits | Fixed. Not auto-scaling. |
84-
| Flexibility | High. Almost anything that you can do on an EC2 | Low. Can only do what AWS allows in Fargate | High. Almost anything that you can do on an EC2 |
85-
| Fractional CPUs? | No. Each task has 1 vCPU. | Yes. A task can have 0.25 vCPUs. | Yes. A task can have 0.25 vCPUs. |
85+
| Flexibility | Combines both, depending on if the job queue is Fargate serverless | Low. Can only do what AWS allows in Fargate | High. Almost anything that you can do on an EC2 |
86+
| Fractional CPUs? | Yes, as of Dec 2020 a task can have 0.25 vCPUs. | Yes. A task can have 0.25 vCPUs. | Yes. A task can have 0.25 vCPUs. |
8687

8788

8889
## Optional Container Requirements
@@ -98,10 +99,8 @@ task = PythonOperator(
9899
python_callable=lambda *args, **kwargs: print('hello world'),
99100
task_id='say_hello',
100101
executor_config=dict(
101-
containerOverrides=dict(
102-
vcpus=1, # no fractional CPUs
103-
memory=512
104-
)
102+
vcpus=1,
103+
memory=512
105104
),
106105
dag=dag
107106
)
@@ -220,6 +219,17 @@ CUSTOM_SUBMIT_JOB_KWARGS['retryStrategy'] = {'attempts': 3}
220219
CUSTOM_SUBMIT_JOB_KWARGS['timeout'] = {'attemptDurationSeconds': 24 * 60 * 60 * 60}
221220
```
222221

222+
"I need more levers!!! I should be able to make changes to how the API gets called at runtime!"
223+
224+
```python
225+
class CustomBatchExecutor(AwsBatchExecutor):
226+
def _submit_job_kwargs(self, task_id, cmd, queue, exec_config) -> dict:
227+
submit_job_api = super()._submit_job_kwargs(task_id, cmd, queue, exec_config)
228+
if queue == 'long_tasks_queue':
229+
submit_job_api['retryStrategy'] = {'attempts': 3}
230+
submit_job_api['timeout'] = {'attemptDurationSeconds': 24 * 60 * 60 * 60}
231+
return submit_job_api
232+
```
223233

224234
#### AWS ECS/Fargate
225235
In this example we will modify the default `submit_job_kwargs`. Note, however, there is nothing that's stopping us
@@ -244,6 +254,17 @@ CUSTOM_RUN_TASK_KWARGS['overrides']['containerOverrides'][0]['environment'] = [
244254
]
245255
```
246256

257+
"I need more levers!!! I should be able to make changes to how the API gets called at runtime!"
258+
259+
```python
260+
class CustomFargateExecutor(AwsFargateExecutor):
261+
def _run_task_kwargs(self, task_id, cmd, queue, exec_config) -> dict:
262+
run_task_api = super()._run_task_kwargs(task_id, cmd, queue, exec_config)
263+
if queue == 'long_tasks_queue':
264+
run_task_api['retryStrategy'] = {'attempts': 3}
265+
run_task_api['timeout'] = {'attemptDurationSeconds': 24 * 60 * 60 * 60}
266+
return run_task_api
267+
```
247268

248269
## Issues & Bugs
249270
Please file a ticket in GitHub for issues. Be persistent and be polite.

0 commit comments

Comments
 (0)