-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathu.5 - airflow.txt
More file actions
548 lines (486 loc) · 24.6 KB
/
u.5 - airflow.txt
File metadata and controls
548 lines (486 loc) · 24.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
// LESSON 1.1 - DAGS: collection of all the tasks you want to run
definition
- consist of vertices, or nodes, & directed edges that connect those nodes
- have a direction (->) and do not contain cycles
scope: airflow will load any dag object it can import from a dagfile
context managers: dags can be used as context managers to automatically assign new operators to that dag
part of dags
nodes: step in the data pipeline process
edge: dependencies or relationship other between nodes
// LESSON 1.2 - OPERATORS: describes a single task in your data pipeline
bitshift composition: determine which task would be the first one from the order
<< - equal to set_upstream()
>> - equal to set_downstream()
example: task1 would execute then task2 then task3
task1 >> task2 >> task3
action: perform actions | e.g. BashOperator, PythonOperator, EmailOperator
bash operator example
example_task = BashOperator(
task_id='bash_example', // name that shows up in the ui
bash_command='echo "Example!"', // could also be bash_command='runcleanup.sh'
dag=ml_dag // dag it belongs to
)
transfer: move data from one system to another | e.g. PostgresOperator, MsSqlOperator, S3FileTransformOperator, etc.
postgres operator example
example_task = PostgresOperator(
task_id='PythonOperator',
sql='CREATE TABLE my_table (my_column varchar(10));',
postgres_conn_id='my_postgres_connection',
autocommit=False
)
sensors: waits for a certain condition to be true before start executing
file sensor example
file_sensor_task = FileSensor( // a sensor(special task) used for checking the existence of a file
task_id="file_sense",
filepath="salesdata.csv", // look for this filename to exist before continuing
poke_interval=300, // repeat the check every 300 seconds or 5 minutes
dag=sales_report_dag
)
// LESSON 1.3 - TASKS, TASKS INSTANCES, AND WORKFLOWS
task: once an operator is instantiated, it is referred to as a "task", one operator is equal to one task
task instances: represents a specific run of a task and is characterized as the combination of a dag, a task, and a point in time | has indicative state which would be "running", "success", "failed", "skipped", "up for retry", etc.
some definitions
DAG: a description of the order in which work should take place
Operator: a class that acts as a template for carrying out some work
Task: you could think of this as an operator that's been running or executing
Task Instance: a task that has been assigned to a DAG and has a state associated with a specific run of the DAG
task workflow
operator
-> task instance
-> worker
-> database(regularly updates the status of the task)
-> scheduler(regularly polls the status of the task)
-> signals executor to add the new task
-> sends heartbeat to start the task
-> executor(sequential/celery)
// LESSON 2.1 - HOOKS
definition
- collaboration with external sources via "connections"
- to connect to file sources
- used for creating a connections with postgresql, sqlite, aws s3, etc.
UNFINISHED - example of hooks
# step 1: create a postgres connection at airflow ui
from airflow.providers.postgres.hooks.postgres import PostgresHook
pg_hook = PostgresHook(postgres_conn_id='postgres_bigishdata')
// LESSON 2.2 - POOLS: to control task parallelism in airflow or to run tasks parallely
example of pool
# step 1 - create a pool thru airflow ui | admin -> pools -> add a new record
# assigning task to a pool
# when tasks are assigned to a pool, they will be scheduled as normal until all of the pool's slots are filled
# you can control which tasks within the pool are run first by assigning priority weights
task_a = PythonOperator( # each task can only be assigned to a single pool
task_id='task_a',
python_callable=sleep_function,
pool='single_task_pool'
)
task_b = PythonOperator(
task_id='task_b',
python_callable=sleep_function,
pool='single_task_pool',
priority_weight=2
)
// LESSON 2.3 - CONNECTIONS: purpose of connection is for airflow to know how to connect to your environment
definition
- to create a connection to let's say postgres, you need to use hooks
access connections
airflow ui -> menu -> admin -> connections
// UNFINISHED - LESSON 2.4 - QUEUES
// UNFINISHED - LESSON 2.5 - XCOMS: lets tasks exchange messages
methods
xcom_push - send
xcom_pull - receive
code example
// LESSON 2.6 - VARIABLE: generic way to store and retrieve arbitrary content or settings as a simple key value store within airflow
creating variable
admin -> variables
example grabbing value of variable
from airflow.models import Variable
foo = Variable.get("foo")
bar = Variable.get("bar", deserialize_json=True) # in here bar variable is a json, and we're converting it into dictionary, i think
// LESSON 2.7 - BRANCHING: simple way to implement if-then-else logic in airflow
visualizing branching
task -> branch_task -> branch_a -> task_a
-> branch_b -> task_b
example code of branching
def branch_func():
if random.random() < 0.5:
return "say_hi_task" # run this task if condition is true
return "say_hello_task" # else, run this
def print_hi():
print("hi")
def print_hello():
print("hello")
with dag:
run_this_task = PythonOperator(
task_id="run_this",
python_callable=push_to_xcom,
provide_context=True,
retries=10,
retry_delay=timedelta(seconds=1)
)
branch_op = BranchPythonOperator(
task_id="branch_task",
provide_context=True,
python_callable=branch_func
)
run_this_task_2 = PythonOperator(
task_id="say_hi_task",
python_callable=print_hi, // python script this task would execute
provide_context=True
)
run_this_task_3 = PythonOperator(
task_id="say_hello_task",
python_callable=print_hello,
provide_context=True
)
run_this_task >> branch_op >> [run_this_task_2, run_this_task_3]
// LESSON 2.8 - EXECUTORS: works closely with the scheduler to figure out what resoruces will actually complete those tasks
local executor: ideal for testing
- runs on a single system
- treats task as processes
- parallelism defined by the user
- can utilize all resources of a given host system
celery executor: go with celery for running DAGs in production, especially if you're running anything that's time sensitive
- uses a celery backend as task manager
- multiple workey systems can be defined
- significantly more difficult to setup and configure
- extremely powerful method for organizations with extensive workflows
kubernetes executor: if you're familiar with kubernetes then use this
sequential executor: runs a single task instance at a time in a linear fashion with no parallelism functionality (A -> B -> C)
- default airflow executor
- runs one task at a time
- useful for debugging
- while functional, not really recommended for production
- not recommended for any use cases that require more than a single task execution at a time
// LESSON 2.9 - SCHEDULER: monitors all tasks and dags, then triggers the task instance once their dependencies are complete
// UNFINISHED - LESSON 3.1 - SUBDAGS: perfect for repeating patterns
definition
- used to minimize repetitive patterns
- avoid subdags if the intended use of the subdag is to simply group tasks within a dag's graph view - airflow's got task groups that satisfies this purpose without performance and functional issues of subdags
sample code of subdag
# in order to create a subdag you have to use a factory function that returns a dag object(the subdag in our case) and the subdagoperator to attach the subdag to the main dag
# airflow ui only shows the main dag. in order to see your subdags, you will have to click on the related main dag and then "zoom in" into the subdags from the graph view
# subdags must be scheduled the same as their parent dag
def load_subdag(parent_dag_name, child_dag_name, args):
dag_subdag = DAG(
dag_id='{0}.{1}'.format(parent_dag_name, child_dag_name),
default_args=args,
schedule_interval="@daily",
)
with dag_subdag:
for i in range(5):
t = DummyOperator(
task_id='load_subdag_{0}'.format(i),
default_args=args,
dag=dag_subdag,
)
return dag_subdag
load_tasks = SubDagOperator(
task_id="load_tasks",
subdag=load_subdag(
parent_dag_name="example_subdag_operator",
child_dag_name="load_tasks",
args=default_args
),
default_args=default_args,
dag=dag,
)
// UNFINISHED - LESSON 3.2 - TASK GROUPS
// LESSON 3.3 - SLA(time by which a task or dags should have succeeded ) AND ALERTING
defining sla
if define sla for each task
task1 = BashOperator(
task_id='sla_task',
bash_command='runcode.sh',
sla=timedelta(seconds=30),
dag=dag
)
if define sla for all of the task - set it in the dag
default_args={
'sla': timedelta(minutes=20)
'start_date': datetime(2020,2,20)
}
dag = DAG('sla_dag', default_args=default_args)
alerting
general reporting
- options for success/failure/error
default_args={
// used for sending messages for success/failure/error
'email': ['airflowresults@datacamp.com'], // email we're gonna send the success/failure/error
'email_on_failure': False, // specify if we would send email at failure
'email_on_retry': False, // specify if we would send email at retry
'retries': 3, // how many retries
'email_on_success': True, // specify if we woudl send an email at success
...}
// LESSON 3.4 - TRIGGER RULES
definition
- although the normal workflow behavior is to trigger tasks when all their directly upstream tasks have suceeded, airflow allows for more complex dependency settings
- all operators have a default trigger rule value of all sucess - meaning the first task before the current task must be succesful first before current task would execute
trigger rules values
all_success: (default) all parents have succeeded
all_failed: all parents are in a failed or upstream_failed state
all_done: all parents are done with their execution
one_failed: fires as soon as at least one parent has failed, it does not wait for all parents to be done
one_success: fires as soon as at least one parent succeeds, it does not wait for all parents to be done
none_failed: all parents have not failed (failed or upstream_failed) i.e. all parents have succeeded or been skipped
none_skipped: no parent is in a skipped state, i.e. all parents are in a success, failed, or upstream_failed state
dummy: dependencies are just for show, trigger at will
sample code
join = DummyOperator(task_id='join', dag=dag, trigger_rule='none_failed')
// unfinished - LESSON 3.5 - LATEST RUN ONLY
definition
- LatestOnlyOperator is used to skip tasks that are not being run during the most recent scheduled run for a dag
- skips all downstream tasks, if the time right now is not between its execution_time and the next scheduled execution_time
// UNFINISHED - LESSON 3.6 - ZOMBIES AND UNDEADS: you must understand task workflow first, have a look at lesson 1.3
zombie task
- if there's a connection failure between worker and operator or worker kills off unexpectedly, while the operator is still running, it becomes a zombie process
undead task
- existence of a process and a matching heartbeat but airflow isn't aware of this task as running the database
more info about these
// LESSON 3.7 - CLUSTER POLICY
definition
- your local airflow settings file can define a polciy function that has the ability to mutate task attributes based on other task or dag attributes
example code
# this function could apply a specific queue property when using a specific operator, or enforce a task timeout policy, making sure that no tasks run for more than 48 hours
def policy(task):
if task.__class__.__name__ == 'HivePartitionSensor':
task.queue = "sensor_queue"
if task.timeout > timedelta(hours=48):
task.timeout = timedelta(hours=48)
// UNFINISHED - LESSON 3.8 - JINJA TEMPLATING: templating allow users to "fill in the blank" with important runtime variables for tasks
variables: airflow has default variables you could pass in as a python's argument by using provide_context = True
macros
templating
example templating(using default airflow variable)
def log_details (*args, **kwargs): # you pass argument thru provide_context, args for the varaible you set in airflow, kwargs is for default variables for airflow
dag = DAG(
"lezgo.demo",
schedule_interval="@daily",
start_date=datetime.datetime.now()
)
list_task = PythonOperator(
task_id="task1",
python_callable=log_details # python script we're calling, notice we don't pass an argument but in the implementation of the function it has parameters
dag= dag
provide_context=True # this is way of passing default airflow variables to the python function we assigned at python callable
)
// UNFINISHED - LESSON 3.9 - PACKAGED DAGS
// UNFINISHED - LESSON 3.10 - .AIRFLOWIGNORE
// UNFINISHED - LESSON 3.11 - SCHEDULING AND TRIGGERS
dag runs
backfill and catchup
external triggers
// LESSON X.0 - SAMPLE AIRFLOW USAGE - applying what we learned
from datetime import datetime
from datetime import timedelta # for using timedelta
from airflow.operators.bashoperator import BashOperator # for creating bash operator
from airflow.operators.python_operator import PythonOperator # for creating python operator
from airflow.operators.email_operator import EmailOperator # for creating email operator
from airflow.contrib.sensors.file_sensor import FileSensor # for creating special operator(sensor) file sensor
from airflow.models import DAG # for creating dags
# creating argument for dag
default_args = {
'owner': 'Engineering', # owner of dag
'start_date': datetime(2019, 11, 1), # when the dag would start
# used for sending messages for success/failure/error or general reporting in short for the sla
'email': ['airflowresults@datacamp.com'], # email we're gonna send the success/failure/error
'email_on_failure': False, # specify if we would send email at failure
'email_on_retry': False, # specify if we would send email at retry
'email_on_success': True, # specify if we woudl send an email at success
'retries': 3, # how many retries
'retry_delay': timedelta(minutes=20),
'sla': timedelta(minutes=30) # amount of time a dag should run, we could also specify this to a task | if run time is exactly or below the specified sla then it's successful
}
# creating dag
codependency_dag = DAG(
'codependency', #
default_args=default_args #
schedule_interval='30 12 * * 3' # scheduling dag(how often will this dag run, e.g. daily,hourly,etc.) using cron, but you could also use timedelta object or preset
)
# creating tasks or operator, remember operators only run once per dag run
bash_task = BashOperator( # bashoperator - for runnign shell scripts
task_id='first_task', # name of the task
bash_command='echo 1', # what would the task do, can also be a shell script
dag=codependency_dag # what dag does it belong to
sla= timedelta(minutes=30) # amount of time this task should run, if this task run more than this sla time especified, send an email from the one we specified in default_args
)
python_task = PythonOperator( # pythonoperator - for running python functions
task_id='simple_print', # name of function
python_callable=printme, # call this function, do this if without argument
provide_context = True, # nvm this this just needs to be here
dag=codependency_dag # dag it belongs to
)
# email operator could take advantage of templated commands but we won't do it here
email_task = EmailOperator( # emailoperator - for sending emails
task_id='email_sales_report', # name of task
to='sales_manager@example.com', # where you're gonna send the email
subject='Automated Sales Report', # obvs the subject
html_content='Attached is the latest sales report', # obvs the content
files='latest_sales.xlsx', # file you wanted to send
dag=codependency_dag # daig it belongs to
)
# sensor - can be used to repeat task without the need of loop | poke_interval makes it possible, operator doesn't have this
file_sensor_task = FileSensor( # a sensor(special task) used for checking the existence of a file
task_id="file_sense", # name of task
filepath="salesdata.csv", # look for this filename to exist before continuing
poke_interval=300, # repeat the check every 300 seconds or 5 minutes
dag=codependency_dag # dag it belongs to
)
# creating templated command - meaning we coudl substitute value that is wrapped with curly brace
# creating templated command but use ds_nodash and params.filename
# there are even more advance templates but we'll stick with the common one | advance template could loop an operator/task
templated_command = """
bash cleandata.sh {{ ds_nodash }} {{ params.filename }} // ds_nodash means YYYYMMDD, ds_nodash is also a variable
"""
t1 = BashOperator(
task_id='template_task', // name of task
bash_command=templated_command, // bash command to execute
params={'filename': 'file1.txt'} // parameter in that command, this would replace the params.filename from the templated command
dag=example_dag // dag it belongs to
)
# i didn't apply branching in here but just have a look at your lesson
# creating task dependency or flow of the task - remember it cannot be in a loop
# in here we used upstream (>>) which means bash_task has to be accomplished first before going into python_task
bash_task >> python_task >> email_task >> file_sensor_task
# check what kind of executor did you use, type in terminal: airflow list_dags
// LESSON X.1 - DATA QUALITY CHECKS
definition
- data quality is key to the success of an organization's data systems
- with in-dag qualtiy checks, you can halt pipeliens and alert stakeholders before bad data makes its way to a production lake or warehouse
sql check operator: returns a single row from a provided sql query and checks to see if any of the returned values in that row are false, if any values are false, the task fails
SQLCheckOperator(
task_id="yellow_tripdata_row_quality_check",
sql="row_quality_yellow_tripdata_check.sql",
params={"pickup_datetime": "2021-01-01"},
)
sql value check operator: simple operator that is useful when a specific, known value is being check either as an exact value or within a percentage threshold
SQLValueCheckOperator(
task_id="check_row_count",
sql="SELECT COUNT(*) FROM yellow_tripdata",
pass_value=20000,
threshold=0.01
)
sql interval check operator: time based operator, useful for checking current data against historical data
SQLIntervalCheckOperator(
task_id="check_interval_data",
days_back=1,
date_filter_column="upload_date",
metrics_threshold={"AVG(trip_distance)": 1.5}
)
sql threshold check operator: works like sql value check operator but instead of a single threshold, there is a min and max given
SQLThresholdCheckOperator(
task_id="check_threshold",
sql="SELECT MAX(passenger_count)",
min_threshold=1,
max_threshold=8
)
// LESSON X.2 - OPERATORS TO WATCH OUT
bash operator: for running bash scripts
example_task = BashOperator(
task_id='bash_example', // name that shows up in the ui
bash_command='echo "Example!"', // could also be bash_command='runcleanup.sh' |
dag=ml_dag // dag it belongs to
)
python operator: for running python scripts
python_task = PythonOperator(
task_id='simple_print', // name of function
python_callable=printme, // call this function, do this if without argument
dag=example_dag // dag it belongs to
)
postgres operator: running sql statement against a postgres database
t3 = PostgresOperator(
task_id='PythonOperator',
sql='CREATE TABLE my_table (my_column varchar(10));',
postgres_conn_id='my_postgres_connection',
autocommit=False
)
ssh operator
t4 = SSHOperator(
task_id='SSHOperator',
ssh_conn_id='my_ssh_connection',
command='echo "Hello from SSH Operator"'
)
email operator: for sending emails
email_task = EmailOperator(
task_id='email_sales_report', // name of task
to='sales_manager@example.com', // where you're gonna send the email
subject='Automated Sales Report', // obvs the subject
html_content='Attached is the latest sales report', // obvs the content
files='latest_sales.xlsx', // file you wanted to send
dag=example_dag
)
dummy operator: does literally nothing | can be used to group tasks in a dag
no_email_task = DummyOperator(task_id='no_email_task', dag=dag)
s3 to redshift operator:
t5 = S3ToRedshiftOperator(
task_id='S3ToRedshift',
schema='public',
table='my_table',
s3_bucket='my_s3_bucket',
s3_key='{{ ds_nodash }}/my_file.csv',
redshift_conn_id='my_redshift_connection',
aws_conn_id='my_aws_connection'
)
// LESSON X.3 - SENSORS TO WATCH OUT
file sensor
from airflow.contrib.sensors.file_sensor import FileSensor
file_sensor_task = FileSensor( // a sensor(special task) used for checking the existence of a file
task_id="file_sense",
filepath="salesdata.csv", // look for this filename to exist before continuing
poke_interval=300, // repeat the check every 300 seconds or 5 minutes
dag=sales_report_dag
)
other sensors
S3KeySensor: waits for a key (file) to land in an s3 bucket
PythonSensor: waits for a python callable(function) to return True
DateTimeSensor: waits for a specified date and time to pass
ExternalTaskSensor: wait for a task in another DAG to complete
HttpSensor: request a web url and check for content
SqlSensor: runs a sql query to check for content
and many other in airflow.sensors and airflow.contrib.sensors
// LESSON X.4 - SUMMARY OF EVERYTHING YOU NEED TO KNOW
dags: flow. it consists of nodes(task) and line(relationship - connects tasks to create a flow)
operators,tasks, and downstream/upstream
operators: used to create a flow | you could create customized operator too
tasks: each operator is a task
upstream/downstream: a >> b is downstream meaning b must execute after a, vice versa for upstream
hooks and connections
hooks: like an operator but is used to create a connection to lets say postgres or s3
connections: connection to other things outside airflow e.g. postgres
templating and default variables
templating: when you use variables or default variables of airflow
default variables: specify provide_context = True to be able to use default varaibles from airflow
scheduling: setting a time on what time should a dag start and end executing
sla: send an email when the task exceeded the expected time of execution
executor
important concepts
backfill
data partitioning: separating flows within a dag, in this way we could achieve parallelism | you could separate data based on time, concept, or data size
data quality: adding a task that checks whether our data is good or not
// LESSON X.X - CORE IDEAS IN AIRFLOW
dags
scope
default arguments
context manager
operators
dag assignment
bitshift composition
tasks
task instances
workflows
hooks
pools
connections
queues
xcoms
variable
branching
subdags
sla
trigger rules
latest run only
zombies and undeads
cluster policy
jinja templating
packaged dags
.airflowignore