From 3327e511b74c48dd0791b8eb8e7c5dbb37d42100 Mon Sep 17 00:00:00 2001 From: Cicero Moura Date: Tue, 12 Oct 2021 21:41:00 -0300 Subject: [PATCH] upgrade infraestructure and airflow for version 2.0 --- Dockerfile | 106 +-- LICENSE | 1 + README.md | 74 +- config/airflow.cfg | 854 ------------------ config/entrypoint.sh | 47 +- config/requirements.txt | 4 + dags/DagFetcher.py | 27 + dags/my_first_dag.py | 64 -- dags/my_second_dag.py | 52 -- infrastructure/airflow_flower.tf | 105 --- infrastructure/airflow_scheduler.tf | 92 -- .../airflow_services/airflow_flower.tf | 54 ++ .../airflow_services/airflow_scheduler.tf | 54 ++ .../airflow_services/airflow_web_server.tf | 81 ++ .../airflow_web_server_lb.tf | 2 +- .../airflow_services/airflow_workers.tf | 58 ++ infrastructure/{ => airflow_services}/ecs.tf | 24 +- .../{ => airflow_services}/ecs_services.tf | 14 +- infrastructure/airflow_services/efs.tf | 73 ++ .../{ => airflow_services}/network.tf | 0 .../{ => airflow_services}/output.tf | 0 .../{ => airflow_services}/postgres.tf | 2 +- .../{ => airflow_services}/redis.tf | 0 .../task_definitions/airflow_services.json | 77 ++ infrastructure/airflow_services/variables.tf | 49 + infrastructure/airflow_web_server.tf | 132 --- infrastructure/airflow_workers.tf | 105 --- infrastructure/main.tf | 3 + infrastructure/{config.tf => variables.tf} | 0 requirements.txt | 3 - scripts/deploy.sh | 22 +- scripts/push_to_ecr.sh | 20 - 32 files changed, 593 insertions(+), 1606 deletions(-) delete mode 100644 config/airflow.cfg create mode 100644 config/requirements.txt create mode 100644 dags/DagFetcher.py delete mode 100644 dags/my_first_dag.py delete mode 100644 dags/my_second_dag.py delete mode 100644 infrastructure/airflow_flower.tf delete mode 100644 infrastructure/airflow_scheduler.tf create mode 100644 infrastructure/airflow_services/airflow_flower.tf create mode 100644 infrastructure/airflow_services/airflow_scheduler.tf create mode 100644 infrastructure/airflow_services/airflow_web_server.tf rename infrastructure/{ => airflow_services}/airflow_web_server_lb.tf (97%) create mode 100644 infrastructure/airflow_services/airflow_workers.tf rename infrastructure/{ => airflow_services}/ecs.tf (83%) rename infrastructure/{ => airflow_services}/ecs_services.tf (91%) create mode 100644 infrastructure/airflow_services/efs.tf rename infrastructure/{ => airflow_services}/network.tf (100%) rename infrastructure/{ => airflow_services}/output.tf (100%) rename infrastructure/{ => airflow_services}/postgres.tf (98%) rename infrastructure/{ => airflow_services}/redis.tf (100%) create mode 100644 infrastructure/airflow_services/task_definitions/airflow_services.json create mode 100644 infrastructure/airflow_services/variables.tf delete mode 100644 infrastructure/airflow_web_server.tf delete mode 100644 infrastructure/airflow_workers.tf create mode 100644 infrastructure/main.tf rename infrastructure/{config.tf => variables.tf} (100%) delete mode 100644 requirements.txt delete mode 100755 scripts/push_to_ecr.sh diff --git a/Dockerfile b/Dockerfile index 13c8017..1b4dddc 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,109 +1,33 @@ -# BUILD: docker build --rm -t airflow . -# ORIGINAL SOURCE: https://github.com/puckel/docker-airflow +FROM apache/airflow:2.1.3-python3.8 -FROM python:3.8.5-slim -LABEL version="1.1" -LABEL maintainer="nicor88" - -# Never prompts the user for choices on installation/configuration of packages -ENV DEBIAN_FRONTEND noninteractive -ENV TERM linux - -# Airflow -# it's possible to use v1-10-stable, but it's a development branch -ARG AIRFLOW_VERSION=1.10.11 ENV AIRFLOW_HOME=/usr/local/airflow -ENV AIRFLOW_GPL_UNIDECODE=yes -# celery config -ARG CELERY_REDIS_VERSION=4.2.0 -ARG PYTHON_REDIS_VERSION=3.2.0 - -ARG TORNADO_VERSION=5.1.1 -ARG WERKZEUG_VERSION=0.16.0 - -# Define en_US. -ENV LANGUAGE en_US.UTF-8 -ENV LANG en_US.UTF-8 -ENV LC_ALL en_US.UTF-8 -ENV LC_CTYPE en_US.UTF-8 -ENV LC_MESSAGES en_US.UTF-8 -ENV LC_ALL en_US.UTF-8 - -RUN set -ex \ - && buildDeps=' \ - python3-dev \ - libkrb5-dev \ - libsasl2-dev \ - libssl-dev \ - libffi-dev \ - build-essential \ - libblas-dev \ - liblapack-dev \ - libpq-dev \ - git \ - ' \ - && apt-get update -yqq \ - && apt-get upgrade -yqq \ - && apt-get install -yqq --no-install-recommends \ - ${buildDeps} \ - sudo \ - python3-pip \ - python3-requests \ - default-mysql-client \ - default-libmysqlclient-dev \ - apt-utils \ - curl \ - rsync \ - netcat \ - locales \ - && sed -i 's/^# en_US.UTF-8 UTF-8$/en_US.UTF-8 UTF-8/g' /etc/locale.gen \ - && locale-gen \ - && update-locale LANG=en_US.UTF-8 LC_ALL=en_US.UTF-8 \ - && useradd -ms /bin/bash -d ${AIRFLOW_HOME} airflow \ - && pip install -U pip setuptools wheel \ - && pip install --no-cache-dir pytz \ - && pip install --no-cache-dir pyOpenSSL \ - && pip install --no-cache-dir ndg-httpsclient \ - && pip install --no-cache-dir pyasn1 \ - && pip install --no-cache-dir typing_extensions \ - && pip install --no-cache-dir mysqlclient \ - && pip install --no-cache-dir apache-airflow[async,aws,crypto,celery,github_enterprise,kubernetes,jdbc,postgres,password,s3,slack,ssh]==${AIRFLOW_VERSION} \ - && pip install --no-cache-dir werkzeug==${WERKZEUG_VERSION} \ - && pip install --no-cache-dir redis==${PYTHON_REDIS_VERSION} \ - && pip install --no-cache-dir celery[redis]==${CELERY_REDIS_VERSION} \ - && pip install --no-cache-dir flask_oauthlib \ - && pip install --no-cache-dir SQLAlchemy==1.3.23 \ - && pip install --no-cache-dir Flask-SQLAlchemy==2.4.4 \ - && pip install --no-cache-dir psycopg2-binary \ - && pip install --no-cache-dir tornado==${TORNADO_VERSION} \ - && apt-get purge --auto-remove -yqq ${buildDeps} \ - && apt-get autoremove -yqq --purge \ - && apt-get clean \ - && rm -rf \ - /var/lib/apt/lists/* \ - /tmp/* \ - /var/tmp/* \ - /usr/share/man \ - /usr/share/doc \ - /usr/share/doc-base +USER root +#configs COPY config/entrypoint.sh /entrypoint.sh RUN chmod +x /entrypoint.sh -COPY config/airflow.cfg ${AIRFLOW_HOME}/airflow.cfg -COPY dags ${AIRFLOW_HOME}/dags +#COPY config/airflow.cfg ${AIRFLOW_HOME}/airflow.cfg + +#plugins COPY plugins ${AIRFLOW_HOME}/plugins -RUN chown -R airflow: ${AIRFLOW_HOME} +#initial dags +COPY dags /dags +RUN mkdir ${AIRFLOW_HOME}/dags -ENV PYTHONPATH ${AIRFLOW_HOME} +RUN chown -R airflow:airflow ${AIRFLOW_HOME} +RUN chmod 777 -R /dags USER airflow -COPY requirements.txt . +#requirements +COPY config/requirements.txt . RUN pip install --user --no-cache-dir -r requirements.txt + EXPOSE 8080 5555 8793 WORKDIR ${AIRFLOW_HOME} ENTRYPOINT ["/entrypoint.sh"] + diff --git a/LICENSE b/LICENSE index cc38071..bc240f4 100644 --- a/LICENSE +++ b/LICENSE @@ -1,6 +1,7 @@ MIT License Copyright (c) 2017 nicor88 +Update: Copyright (c) 2021 cicerojmm Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/README.md b/README.md index da200b8..90e2036 100644 --- a/README.md +++ b/README.md @@ -1,10 +1,11 @@ -# airflow-ecs +# Airflow 2.0 in AWS ECS Fargate Setup to run Airflow in AWS ECS containers ## Requirements ### Local * Docker +* Docker Compose ### AWS * AWS IAM User for the infrastructure deployment, with admin permissions @@ -34,48 +35,49 @@ Setup to run Airflow in AWS ECS containers If everything runs correctly you can reach Airflow navigating to [localhost:8080](http://localhost:8080). The current setup is based on [Celery Workers](https://airflow.apache.org/howto/executor/use-celery.html). You can monitor how many workers are currently active using Flower, visiting [localhost:5555](http://localhost:5555) -## Deploy Airflow on AWS ECS -To run Airflow in AWS we will use ECS (Elastic Container Service). +## Deploy Airflow 2.0 on AWS ECS +To run Airflow in AWS we will use ECS (Elastic Container Service) with components in AWS: +* AWS ECS Fargate: run all Airflow services (Webserver, Flower, Workers and Scheduler); +* ElasticCache (Redis): communication between Airflow Services; +* RDS for Postgres: database MetadataDB for Airflow servies; +* EFS: persistent storage for Airflow dags; +* ELB: Application Load Balance for Airflow WebServer access; +* CloudWatch: logs for container services and Airflow run tasks; +* IAM: communications services permission for ECS containers; +* ECR: image repository Docker for storage Airflow images. ### Deploy Infrastructure using Terraform Run the following commands: -
-make infra-init
-make infra-plan
-make infra-apply
-
- -or alternatively -
-cd infrastructure
-terraform get
-terraform init -upgrade;
-terraform plan
-terraform apply
-
+Exports System Variables: + +```sh +export AWS_ACCOUNT=xxxxxxxxxxxxx +export AWS_DEFAULT_REGION=us-east-1 +``` +And build all infraestructure and upload Docker Image: + +```sh +bash scripts/deploy.sh airflow-dev +``` By default the infrastructure is deployed in `us-east-1`. -When the infrastructure is provisioned (the RDS metadata DB will take a while) check the if the ECR repository is created then run: -
-bash scripts/push_to_ecr.sh airflow-dev
-
-By default the repo name created with terraform is `airflow-dev` -Without this command the ECS services will fail to fetch the `latest` image from ECR +The file that runs all airflow services is entrypoint.sh located in the configs folder under the project root. +It is parameterized according to the commands passed in tasks definitions called command. + +## Troubleshooting + +If when uploading the Airflow containers an error occurs such as: +`ResourceInitializationError: failed to invoke EFS utils commands to set up EFS volumes: stderr: b'mount.nfs4...` -### Deploy new Airflow application -To deploy an update version of Airflow you need to push a new container image to ECR. -You can simply doing that running: -
-./scripts/deploy.sh airflow-dev
-
+You will need to mount the EFS on an EC2 instance and perform the following steps: -The deployment script will take care of: -* push a new ECR image to your repository -* re-deploy the new ECS services with the updated image +* mount the EFS on an EC2 in the same VPC; +* access EFS and create the **/data/airflow folder** structure; +* give full and recursive permission on the root folder, something like **chmod 777 -R /data**. +* with this the AIRflow containers will be able to access the volume and create the necessary folders; ## TODO -* Create Private Subnets -* Move ECS containers to Private Subnets -* Use ECS private Links for Private Subnets -* Improve ECS Task and Service Role +* refact terraform on best practices; +* use SSM Parameter Store to keep passwords secret; +* automatically update task definition when uploading a new Airflow version. diff --git a/config/airflow.cfg b/config/airflow.cfg deleted file mode 100644 index e79afac..0000000 --- a/config/airflow.cfg +++ /dev/null @@ -1,854 +0,0 @@ -[core] -# The folder where your airflow pipelines live, most likely a -# subfolder in a code repository -# This path must be absolute -dags_folder = /usr/local/airflow/dags - -# The folder where airflow should store its log files -# This path must be absolute -base_log_folder = /usr/local/airflow/logs - -# Airflow can store logs remotely in AWS S3, Google Cloud Storage or Elastic Search. -# Users must supply an Airflow connection id that provides access to the storage -# location. If remote_logging is set to true, see UPDATING.md for additional -# configuration requirements. -remote_logging = False -remote_log_conn_id = -remote_base_log_folder = -encrypt_s3_logs = False - -# Logging level -logging_level = INFO -fab_logging_level = WARN - -# Logging class -# Specify the class that will specify the logging configuration -# This class has to be on the python classpath -# logging_config_class = my.path.default_local_settings.LOGGING_CONFIG -logging_config_class = - -# Log format -# Colour the logs when the controlling terminal is a TTY. -colored_console_log = True -colored_log_format = [%%(blue)s%%(asctime)s%%(reset)s] {{%%(blue)s%%(filename)s:%%(reset)s%%(lineno)d}} %%(log_color)s%%(levelname)s%%(reset)s - %%(log_color)s%%(message)s%%(reset)s -colored_formatter_class = airflow.utils.log.colored_log.CustomTTYColoredFormatter - -# log_format = [%%(asctime)s] {{%%(filename)s:%%(lineno)d}} %%(levelname)s - %%(message)s -log_format = [%%(asctime)s] %%(levelname)s - %%(message)s -simple_log_format = %%(asctime)s %%(levelname)s - %%(message)s - -# Log filename format -# we need to escape the curly braces by adding an additional curly brace -log_filename_template = {{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log -log_processor_filename_template = {{ filename }}.log -dag_processor_manager_log_location = /usr/local/airflow/logs/dag_processor_manager/dag_processor_manager.log - -# Hostname by providing a path to a callable, which will resolve the hostname -hostname_callable = socket:getfqdn - -# Default timezone in case supplied date times are naive -# can be utc (default), system, or any IANA timezone string (e.g. Europe/Amsterdam) -default_timezone = Europe/Amsterdam - -# The executor class that airflow should use. Choices include -# SequentialExecutor, LocalExecutor, CeleryExecutor, DaskExecutor -executor = CeleryExecutor - -# The SqlAlchemy connection string to the metadata database. -# SqlAlchemy supports many different database engine, more information -# their website -sql_alchemy_conn = postgresql+psycopg2://$POSTGRES_USER:$POSTGRES_PASSWORD@$POSTGRES_HOST:$POSTGRES_PORT/$POSTGRES_DB - -# The encoding for the databases -sql_engine_encoding = utf-8 - -# If SqlAlchemy should pool database connections. -sql_alchemy_pool_enabled = True - -# The SqlAlchemy pool size is the maximum number of database connections -# in the pool. 0 indicates no limit. -sql_alchemy_pool_size = 5 - -# The maximum overflow size of the pool. -# When the number of checked-out connections reaches the size set in pool_size, -# additional connections will be returned up to this limit. -# When those additional connections are returned to the pool, they are disconnected and discarded. -# It follows then that the total number of simultaneous connections the pool will allow is pool_size + max_overflow, -# and the total number of "sleeping" connections the pool will allow is pool_size. -# max_overflow can be set to -1 to indicate no overflow limit; -# no limit will be placed on the total number of concurrent connections. Defaults to 10. -sql_alchemy_max_overflow = 10 - -# The SqlAlchemy pool recycle is the number of seconds a connection -# can be idle in the pool before it is invalidated. This config does -# not apply to sqlite. If the number of DB connections is ever exceeded, -# a lower config value will allow the system to recover faster. -sql_alchemy_pool_recycle = 1800 - -# Check connection at the start of each connection pool checkout. -# Typically, this is a simple statement like “SELECT 1”. -# More information here: https://docs.sqlalchemy.org/en/13/core/pooling.html#disconnect-handling-pessimistic -sql_alchemy_pool_pre_ping = True - -# The schema to use for the metadata database -# SqlAlchemy supports databases with the concept of multiple schemas. -sql_alchemy_schema = - -# The amount of parallelism as a setting to the executor. This defines -# the max number of task instances that should run simultaneously -# on this airflow installation -parallelism = 32 - -# The number of task instances allowed to run concurrently by the scheduler -dag_concurrency = 32 - -# Are DAGs paused by default at creation -dags_are_paused_at_creation = True - -# When not using pools, tasks are run in the "default pool", -# whose size is guided by this config element -non_pooled_task_slot_count = 128 - -# The maximum number of active DAG runs per DAG -max_active_runs_per_dag = 1 - -# Whether to load the examples that ship with Airflow. It's good to -# get started, but you probably want to set this to False in a production -# environment -load_examples = False - -# Where your Airflow plugins are stored -plugins_folder = /usr/local/airflow/plugins - -# Secret key to save connection passwords in the db -fernet_key = $FERNET_KEY - -# Whether to disable pickling dags -donot_pickle = False - -# How long before timing out a python file import while filling the DagBag -dagbag_import_timeout = 30 - -# How long before timing out a DagFileProcessor, which processes a dag file -dag_file_processor_timeout = 30 - -# The class to use for running task instances in a subprocess -task_runner = StandardTaskRunner - -# If set, tasks without a `run_as_user` argument will be run with this user -# Can be used to de-elevate a sudo user running Airflow when executing tasks -default_impersonation = - -# What security module to use (for example kerberos): -security = - -# If set to False enables some unsecure features like Charts and Ad Hoc Queries. -# In 2.0 will default to True. -secure_mode = True - -# Turn unit test mode on (overwrites many configuration options with test -# values at runtime) -unit_test_mode = False - -# Name of handler to read task instance logs. -# Default to use task handler. -task_log_reader = task - -# Whether to enable pickling for xcom (note that this is insecure and allows for -# RCE exploits). This will be deprecated in Airflow 2.0 (be forced to False). -enable_xcom_pickling = True - -# When a task is killed forcefully, this is the amount of time in seconds that -# it has to cleanup after it is sent a SIGTERM, before it is SIGKILLED -killed_task_cleanup_time = 60 - -# Whether to override params with dag_run.conf. If you pass some key-value pairs through `airflow backfill -c` or -# `airflow trigger_dag -c`, the key-value pairs will override the existing ones in params. -dag_run_conf_overrides_params = False - -# Worker initialisation check to validate Metadata Database connection -worker_precheck = False - -# When discovering DAGs, ignore any files that don't contain the strings `DAG` and `airflow`. -dag_discovery_safe_mode = False - -# The number of retries each task is going to have by default. Can be overridden at dag or task level. -default_task_retries = 1 - -# Whether to serialises DAGs and persist them in DB. -# If set to True, Webserver reads from DB instead of parsing DAG files -# More details: https://airflow.apache.org/howto/enable-dag-serialization.html -store_serialized_dags = False - -# Updating serialized DAG can not be faster than a minimum interval to reduce database write rate. -min_serialized_dag_update_interval = 30 - -[cli] -# In what way should the cli access the API. The LocalClient will use the -# database directly, while the json_client will use the api running on the -# webserver -api_client = airflow.api.client.local_client - -# If you set web_server_url_prefix, do NOT forget to append it here, ex: -# endpoint_url = http://localhost:8080/myroot -# So api will look like: http://localhost:8080/myroot/api/experimental/... -endpoint_url = http://localhost:8080 - -[api] -# How to authenticate users of the API -auth_backend = airflow.api.auth.backend.default - -[lineage] -# what lineage backend to use -backend = - -[atlas] -sasl_enabled = False -host = -port = 21000 -username = -password = - -[operators] -# The default owner assigned to each new operator, unless -# provided explicitly or passed via `default_args` -default_owner = airflow -default_cpus = 1 -default_ram = 512 -default_disk = 512 -default_gpus = 0 - -[hive] -# Default mapreduce queue for HiveOperator tasks -default_hive_mapred_queue = - -[webserver] -# The base url of your website as airflow cannot guess what domain or -# cname you are using. This is used in automated emails that -# airflow sends to point links to the right web server -base_url = http://localhost:8080 - -# The ip specified when starting the web server -web_server_host = 0.0.0.0 - -# The port on which to run the web server -web_server_port = 8080 - -# Paths to the SSL certificate and key for the web server. When both are -# provided SSL will be enabled. This does not change the web server port. -web_server_ssl_cert = -web_server_ssl_key = - -# Number of seconds the webserver waits before killing gunicorn master that doesn't respond -web_server_master_timeout = 120 - -# Number of seconds the gunicorn webserver waits before timing out on a worker -web_server_worker_timeout = 120 - -# Number of workers to refresh at a time. When set to 0, worker refresh is -# disabled. When nonzero, airflow periodically refreshes webserver workers by -# bringing up new ones and killing old ones. -worker_refresh_batch_size = 1 - -# Number of seconds to wait before refreshing a batch of workers. -worker_refresh_interval = 30 - -# Secret key used to run your flask app -secret_key = temporary_key - -# Number of workers to run the Gunicorn web server -workers = 4 - -# The worker class gunicorn should use. Choices include -# sync (default), eventlet, gevent -worker_class = sync - -# Log files for the gunicorn webserver. '-' means log to stderr. -access_logfile = - -error_logfile = - - -# Expose the configuration file in the web server -# This is only applicable for the flask-admin based web UI (non FAB-based). -# In the FAB-based web UI with RBAC feature, -# access to configuration is controlled by role permissions. -expose_config = True - -# Set to true to turn on authentication: -# https://airflow.apache.org/security.html#web-authentication -authenticate = False - -# Filter the list of dags by owner name (requires authentication to be enabled) -filter_by_owner = False - -# Filtering mode. Choices include user (default) and ldapgroup. -# Ldap group filtering requires using the ldap backend -# -# Note that the ldap server needs the "memberOf" overlay to be set up -# in order to user the ldapgroup mode. -owner_mode = user - -# Default DAG view. Valid values are: -# tree, graph, duration, gantt, landing_times -dag_default_view = graph - -# Default DAG orientation. Valid values are: -# LR (Left->Right), TB (Top->Bottom), RL (Right->Left), BT (Bottom->Top) -dag_orientation = LR - -# Puts the webserver in demonstration mode; blurs the names of Operators for -# privacy. -demo_mode = False - -# The amount of time (in secs) webserver will wait for initial handshake -# while fetching logs from other worker machine -log_fetch_timeout_sec = 3 - -# By default, the webserver shows paused DAGs. Flip this to hide paused -# DAGs by default -hide_paused_dags_by_default = False - -# Consistent page size across all listing views in the UI -page_size = 50 - -# Use FAB-based webserver with RBAC feature -rbac = False - -# Define the color of navigation bar -navbar_color = #007A87 - -# Default dagrun to show in UI -default_dag_run_display_number = 25 - -# Enable werkzeug `ProxyFix` middleware -enable_proxy_fix = False - -# Set secure flag on session cookie -cookie_secure = False - -# Set samesite policy on session cookie -cookie_samesite = - -# Default setting for wrap toggle on DAG code and TI log views. -default_wrap = False - -# Send anonymous user activity to your analytics tool -# analytics_tool = # choose from google_analytics, segment, or metarouter -# analytics_id = XXXXXXXXXXX - -[email] -email_backend = airflow.utils.email.send_email_smtp - -[smtp] -# If you want airflow to send emails on retries, failure, and you want to use -# the airflow.utils.email.send_email_smtp function, you have to configure an -# smtp server here -smtp_host = localhost -smtp_starttls = True -smtp_ssl = False -# Uncomment and set the user/pass settings if you want to use SMTP AUTH -# smtp_user = airflow -# smtp_password = airflow -smtp_port = 25 -smtp_mail_from = airflow@example.com - -[celery] -# This section only applies if you are using the CeleryExecutor in -# [core] section above - -# The app name that will be used by celery -celery_app_name = airflow.executors.celery_executor - -# The concurrency that will be used when starting workers with the -# "airflow worker" command. This defines the number of task instances that -# a worker will take, so size up your workers based on the resources on -# your worker box and the nature of your tasks -worker_concurrency = 16 - -# The maximum and minimum concurrency that will be used when starting workers with the -# "airflow worker" command (always keep minimum processes, but grow to maximum if necessary). -# Note the value should be "max_concurrency,min_concurrency" -# Pick these numbers based on resources on worker box and the nature of the task. -# If autoscale option is available, worker_concurrency will be ignored. -# http://docs.celeryproject.org/en/latest/reference/celery.bin.worker.html#cmdoption-celery-worker-autoscale -# worker_autoscale = 16,12 - -# When you start an airflow worker, airflow starts a tiny web server -# subprocess to serve the workers local log files to the airflow main -# web server, who then builds pages and sends them to users. This defines -# the port on which the logs are served. It needs to be unused, and open -# visible from the main web server to connect into the workers. -worker_log_server_port = 8793 - -# The Celery broker URL. Celery supports RabbitMQ, Redis and experimentally -# a sqlalchemy database. Refer to the Celery documentation for more -# information. -# http://docs.celeryproject.org/en/latest/userguide/configuration.html#broker-settings -broker_url = redis://redis:6379/1 - -# The Celery result_backend. When a job finishes, it needs to update the -# metadata of the job. Therefore it will post a message on a message bus, -# or insert it into a database (depending of the backend) -# This status is used by the scheduler to update the state of the task -# The use of a database is highly recommended -# http://docs.celeryproject.org/en/latest/userguide/configuration.html#task-result-backend-settings -result_backend = db+postgresql://$POSTGRES_USER:$POSTGRES_PASSWORD@$POSTGRES_HOST:$POSTGRES_PORT/$POSTGRES_DB - -# Celery Flower is a sweet UI for Celery. Airflow has a shortcut to start -# it `airflow flower`. This defines the IP that Celery Flower runs on -flower_host = 0.0.0.0 - -# The root URL for Flower -# Ex: flower_url_prefix = /flower -flower_url_prefix = - -# This defines the port that Celery Flower runs on -flower_port = 5555 - -# Securing Flower with Basic Authentication -# Accepts user:password pairs separated by a comma -# Example: flower_basic_auth = user1:password1,user2:password2 -flower_basic_auth = - -# Default queue that tasks get assigned to and that worker listen on. -default_queue = default - -# How many processes CeleryExecutor uses to sync task state. -# 0 means to use max(1, number of cores - 1) processes. -sync_parallelism = 0 - -# Import path for celery configuration options -celery_config_options = airflow.config_templates.default_celery.DEFAULT_CELERY_CONFIG - -# In case of using SSL -ssl_active = False -ssl_key = -ssl_cert = -ssl_cacert = - -# Celery Pool implementation. -# Choices include: prefork (default), eventlet, gevent or solo. -# See: -# https://docs.celeryproject.org/en/latest/userguide/workers.html#concurrency -# https://docs.celeryproject.org/en/latest/userguide/concurrency/eventlet.html -pool = prefork - -[celery_broker_transport_options] -# This section is for specifying options which can be passed to the -# underlying celery broker transport. See: -# http://docs.celeryproject.org/en/latest/userguide/configuration.html#std:setting-broker_transport_options - -# The visibility timeout defines the number of seconds to wait for the worker -# to acknowledge the task before the message is redelivered to another worker. -# Make sure to increase the visibility timeout to match the time of the longest -# ETA you're planning to use. -# -# visibility_timeout is only supported for Redis and SQS celery brokers. -# See: -# http://docs.celeryproject.org/en/master/userguide/configuration.html#std:setting-broker_transport_options -# -#visibility_timeout = 21600 - -[dask] -# This section only applies if you are using the DaskExecutor in -# [core] section above - -# The IP address and port of the Dask cluster's scheduler. -cluster_address = 127.0.0.1:8786 -# TLS/ SSL settings to access a secured Dask scheduler. -tls_ca = -tls_cert = -tls_key = - -[scheduler] -# Task instances listen for external kill signal (when you clear tasks -# from the CLI or the UI), this defines the frequency at which they should -# listen (in seconds). -job_heartbeat_sec = 5 - -# The scheduler constantly tries to trigger new tasks (look at the -# scheduler section in the docs for more information). This defines -# how often the scheduler should run (in seconds). -scheduler_heartbeat_sec = 5 - -# after how much time should the scheduler terminate in seconds -# -1 indicates to run continuously (see also num_runs) -run_duration = -1 - -# The number of times to try to schedule each DAG file -# -1 indicates unlimited number -num_runs = -1 - -# The number of seconds to wait between consecutive DAG file processing -processor_poll_interval = 1 - -# after how much time (seconds) a new DAGs should be picked up from the filesystem -min_file_process_interval = 0 - -# How often (in seconds) to scan the DAGs directory for new files. Default to 5 minutes. -dag_dir_list_interval = 180 - -# How often should stats be printed to the logs -print_stats_interval = 30 - -# If the last scheduler heartbeat happened more than scheduler_health_check_threshold ago (in seconds), -# scheduler is considered unhealthy. -# This is used by the health check in the "/health" endpoint -# This is used by the health check in the "/health" endpoint -scheduler_health_check_threshold = 30 - -child_process_log_directory = /usr/local/airflow/logs/scheduler - -# Local task jobs periodically heartbeat to the DB. If the job has -# not heartbeat in this many seconds, the scheduler will mark the -# associated task instance as failed and will re-schedule the task. -scheduler_zombie_task_threshold = 300 - -# Turn off scheduler catchup by setting this to False. -# Default behavior is unchanged and -# Command Line Backfills still work, but the scheduler -# will not do scheduler catchup if this is False, -# however it can be set on a per DAG basis in the -# DAG definition (catchup) -catchup_by_default = False - -# This changes the batch size of queries in the scheduling main loop. -# If this is too high, SQL query performance may be impacted by one -# or more of the following: -# - reversion to full table scan -# - complexity of query predicate -# - excessive locking -# -# Additionally, you may hit the maximum allowable query length for your db. -# -# Set this to 0 for no limit (not advised) -max_tis_per_query = 512 - -# Statsd (https://github.com/etsy/statsd) integration settings -statsd_on = False -statsd_host = localhost -statsd_port = 8125 -statsd_prefix = airflow - -# The scheduler can run multiple threads in parallel to schedule dags. -# This defines how many threads will run. -max_threads = 2 - -authenticate = False - -# Turn off scheduler use of cron intervals by setting this to False. -# DAGs submitted manually in the web UI or with trigger_dag will still run. -use_job_schedule = True - -[ldap] -# set this to ldaps://: -uri = -user_filter = objectClass=* -user_name_attr = uid -group_member_attr = memberOf -superuser_filter = -data_profiler_filter = -bind_user = cn=Manager,dc=example,dc=com -bind_password = insecure -basedn = dc=example,dc=com -cacert = /etc/ca/ldap_ca.crt -search_scope = LEVEL - -# This setting allows the use of LDAP servers that either return a -# broken schema, or do not return a schema. -ignore_malformed_schema = False - -[mesos] -# Mesos master address which MesosExecutor will connect to. -master = localhost:5050 - -# The framework name which Airflow scheduler will register itself as on mesos -framework_name = Airflow - -# Number of cpu cores required for running one task instance using -# 'airflow run --local -p ' -# command on a mesos slave -task_cpu = 1 - -# Memory in MB required for running one task instance using -# 'airflow run --local -p ' -# command on a mesos slave -task_memory = 256 - -# Enable framework checkpointing for mesos -# See http://mesos.apache.org/documentation/latest/slave-recovery/ -checkpoint = False - -# Failover timeout in milliseconds. -# When checkpointing is enabled and this option is set, Mesos waits -# until the configured timeout for -# the MesosExecutor framework to re-register after a failover. Mesos -# shuts down running tasks if the -# MesosExecutor framework fails to re-register within this timeframe. -# failover_timeout = 604800 - -# Enable framework authentication for mesos -# See http://mesos.apache.org/documentation/latest/configuration/ -authenticate = False - -# Mesos credentials, if authentication is enabled -# default_principal = admin -# default_secret = admin - -# Optional Docker Image to run on slave before running the command -# This image should be accessible from mesos slave i.e mesos slave -# should be able to pull this docker image before executing the command. -# docker_image_slave = puckel/docker-airflow - -[kerberos] -ccache = /tmp/airflow_krb5_ccache -# gets augmented with fqdn -principal = airflow -reinit_frequency = 3600 -kinit_path = kinit -keytab = airflow.keytab - -[github_enterprise] -api_rev = v3 - -[admin] -# UI to hide sensitive variable fields when set to True -hide_sensitive_variable_fields = True - -[elasticsearch] -# Elasticsearch host -host = -# Format of the log_id, which is used to query for a given tasks logs -log_id_template = {{dag_id}}-{{task_id}}-{{execution_date}}-{{try_number}} -# Used to mark the end of a log stream for a task -end_of_log_mark = end_of_log -# Qualified URL for an elasticsearch frontend (like Kibana) with a template argument for log_id -# Code will construct log_id using the log_id template from the argument above. -# NOTE: The code will prefix the https:// automatically, don't include that here. -frontend = -# Write the task logs to the stdout of the worker, rather than the default files -write_stdout = False -# Instead of the default log formatter, write the log lines as JSON -json_format = False -# Log fields to also attach to the json output, if enabled -json_fields = asctime, filename, lineno, levelname, message - -[elasticsearch_configs] - -use_ssl = False -verify_certs = True - -[kubernetes] -# The repository, tag and imagePullPolicy of the Kubernetes Image for the Worker to Run -worker_container_repository = -worker_container_tag = -worker_container_image_pull_policy = IfNotPresent - -# If True (default), worker pods will be deleted upon termination -delete_worker_pods = True - -# Number of Kubernetes Worker Pod creation calls per scheduler loop -worker_pods_creation_batch_size = 1 - -# The Kubernetes namespace where airflow workers should be created. Defaults to `default` -namespace = - -# The name of the Kubernetes ConfigMap Containing the Airflow Configuration (this file) -airflow_configmap = - -# For docker image already contains DAGs, this is set to `True`, and the worker will search for dags in dags_folder, -# otherwise use git sync or dags volume claim to mount DAGs -dags_in_image = True - -# For either git sync or volume mounted DAGs, the worker will look in this subpath for DAGs -dags_volume_subpath = - -# For DAGs mounted via a volume claim (mutually exclusive with git-sync and host path) -dags_volume_claim = - -# For volume mounted logs, the worker will look in this subpath for logs -logs_volume_subpath = - -# A shared volume claim for the logs -logs_volume_claim = - -# For DAGs mounted via a hostPath volume (mutually exclusive with volume claim and git-sync) -# Useful in local environment, discouraged in production -dags_volume_host = - -# A hostPath volume for the logs -# Useful in local environment, discouraged in production -logs_volume_host = - -# A list of configMapsRefs to envFrom. If more than one configMap is -# specified, provide a comma separated list: configmap_a,configmap_b -env_from_configmap_ref = - -# A list of secretRefs to envFrom. If more than one secret is -# specified, provide a comma separated list: secret_a,secret_b -env_from_secret_ref = - -# Git credentials and repository for DAGs mounted via Git (mutually exclusive with volume claim) -git_repo = -git_branch = -git_subpath = - -# The specific rev or hash the git_sync init container will checkout -# This becomes GIT_SYNC_REV environment variable in the git_sync init container for worker pods -git_sync_rev = - -# Use git_user and git_password for user authentication or git_ssh_key_secret_name and git_ssh_key_secret_key -# for SSH authentication -git_user = -git_password = -git_sync_root = /git -git_sync_dest = repo -# Mount point of the volume if git-sync is being used. -# i.e. {AIRFLOW_HOME}/dags -git_dags_folder_mount_point = - -# To get Git-sync SSH authentication set up follow this format -# -# airflow-secrets.yaml: -# --- -# apiVersion: v1 -# kind: Secret -# metadata: -# name: airflow-secrets -# data: -# # key needs to be gitSshKey -# gitSshKey: -# --- -# airflow-configmap.yaml: -# apiVersion: v1 -# kind: ConfigMap -# metadata: -# name: airflow-configmap -# data: -# known_hosts: | -# github.com ssh-rsa <...> -# airflow.cfg: | -# ... -# -# git_ssh_key_secret_name = airflow-secrets -# git_ssh_known_hosts_configmap_name = airflow-configmap -git_ssh_key_secret_name = -git_ssh_known_hosts_configmap_name = - -# To give the git_sync init container credentials via a secret, create a secret -# with two fields: GIT_SYNC_USERNAME and GIT_SYNC_PASSWORD (example below) and -# add `git_sync_credentials_secret = ` to your airflow config under the kubernetes section -# -# Secret Example: -# apiVersion: v1 -# kind: Secret -# metadata: -# name: git-credentials -# data: -# GIT_SYNC_USERNAME: -# GIT_SYNC_PASSWORD: -git_sync_credentials_secret = - -# For cloning DAGs from git repositories into volumes: https://github.com/kubernetes/git-sync -git_sync_container_repository = k8s.gcr.io/git-sync -git_sync_container_tag = v3.1.1 -git_sync_init_container_name = git-sync-clone -git_sync_run_as_user = 65533 - -# The name of the Kubernetes service account to be associated with airflow workers, if any. -# Service accounts are required for workers that require access to secrets or cluster resources. -# See the Kubernetes RBAC documentation for more: -# https://kubernetes.io/docs/admin/authorization/rbac/ -worker_service_account_name = - -# Any image pull secrets to be given to worker pods, If more than one secret is -# required, provide a comma separated list: secret_a,secret_b -image_pull_secrets = - -# GCP Service Account Keys to be provided to tasks run on Kubernetes Executors -# Should be supplied in the format: key-name-1:key-path-1,key-name-2:key-path-2 -gcp_service_account_keys = - -# Use the service account kubernetes gives to pods to connect to kubernetes cluster. -# It's intended for clients that expect to be running inside a pod running on kubernetes. -# It will raise an exception if called from a process not running in a kubernetes environment. -in_cluster = True - -# When running with in_cluster=False change the default cluster_context or config_file -# options to Kubernetes client. Leave blank these to use default behaviour like `kubectl` has. -# cluster_context = -# config_file = - - -# Affinity configuration as a single line formatted JSON object. -# See the affinity model for top-level key names (e.g. `nodeAffinity`, etc.): -# https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.12/#affinity-v1-core -affinity = - -# A list of toleration objects as a single line formatted JSON array -# See: -# https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.12/#toleration-v1-core -tolerations = - -# **kwargs parameters to pass while calling a kubernetes client core_v1_api methods from Kubernetes Executor -# provided as a single line formatted JSON dictionary string. -# List of supported params in **kwargs are similar for all core_v1_apis, hence a single config variable for all apis -# See: -# https://raw.githubusercontent.com/kubernetes-client/python/master/kubernetes/client/apis/core_v1_api.py -# Note that if no _request_timeout is specified, the kubernetes client will wait indefinitely for kubernetes -# api responses, which will cause the scheduler to hang. The timeout is specified as [connect timeout, read timeout] -kube_client_request_args = {"_request_timeout" : [60,60] } - -# Worker pods security context options -# See: -# https://kubernetes.io/docs/tasks/configure-pod-container/security-context/ - -# Specifies the uid to run the first process of the worker pods containers as -run_as_user = - -# Specifies a gid to associate with all containers in the worker pods -# if using a git_ssh_key_secret_name use an fs_group -# that allows for the key to be read, e.g. 65533 -fs_group = - -[kubernetes_node_selectors] -# The Key-value pairs to be given to worker pods. -# The worker pods will be scheduled to the nodes of the specified key-value pairs. -# Should be supplied in the format: key = value - -[kubernetes_annotations] -# The Key-value annotations pairs to be given to worker pods. -# Should be supplied in the format: key = value - -[kubernetes_environment_variables] -# The scheduler sets the following environment variables into your workers. You may define as -# many environment variables as needed and the kubernetes launcher will set them in the launched workers. -# Environment variables in this section are defined as follows -# = -# -# For example if you wanted to set an environment variable with value `prod` and key -# `ENVIRONMENT` you would follow the following format: -# ENVIRONMENT = prod -# -# Additionally you may override worker airflow settings with the AIRFLOW__
__ -# formatting as supported by airflow normally. - -[kubernetes_secrets] -# The scheduler mounts the following secrets into your workers as they are launched by the -# scheduler. You may define as many secrets as needed and the kubernetes launcher will parse the -# defined secrets and mount them as secret environment variables in the launched workers. -# Secrets in this section are defined as follows -# = = -# -# For example if you wanted to mount a kubernetes secret key named `postgres_password` from the -# kubernetes secret object `airflow-secret` as the environment variable `POSTGRES_PASSWORD` into -# your workers you would follow the following format: -# POSTGRES_PASSWORD = airflow-secret=postgres_credentials -# -# Additionally you may override worker airflow settings with the AIRFLOW__
__ -# formatting as supported by airflow normally. - -[kubernetes_labels] -# The Key-value pairs to be given to worker pods. -# The worker pods will be given these static labels, as well as some additional dynamic labels -# to identify the task. -# Should be supplied in the format: key = value diff --git a/config/entrypoint.sh b/config/entrypoint.sh index 17f07b2..63e2657 100755 --- a/config/entrypoint.sh +++ b/config/entrypoint.sh @@ -1,45 +1,34 @@ #!/usr/bin/env bash -TRY_LOOP="20" - -: "${REDIS_HOST:="redis"}" -: "${REDIS_PORT:="6379"}" - -: "${POSTGRES_HOST:="postgres"}" -: "${POSTGRES_PORT:="5432"}" - -wait_for_port() { - local name="$1" host="$2" port="$3" - local j=0 - while ! nc -z "$host" "$port" >/dev/null 2>&1 < /dev/null; do - j=$((j+1)) - if [ $j -ge $TRY_LOOP ]; then - echo >&2 "$(date) - $host:$port still not reachable, giving up" - exit 1 - fi - echo "$(date) - waiting for $name... $j/$TRY_LOOP" - sleep 5 - done -} - - -wait_for_port "Postgres" "$POSTGRES_HOST" "$POSTGRES_PORT" - -wait_for_port "Redis" "$REDIS_HOST" "$REDIS_PORT" +# copy files in EFS volume +cp -R /dags/* /usr/local/airflow/dags/ +# start Airflow service as per +#the previous parameter in command container case "$1" in webserver) - airflow initdb + airflow db init \ + && airflow users create \ + --role Admin \ + --username airflow \ + --password airflow \ + --email airflow@example.com \ + --firstname airflow \ + --lastname airflow sleep 5 exec airflow webserver ;; - worker|scheduler) + scheduler) sleep 15 exec airflow "$@" ;; + worker) + sleep 15 + exec airflow celery "$@" + ;; flower) sleep 15 - exec airflow "$@" + exec airflow celery "$@" ;; version) exec airflow "$@" diff --git a/config/requirements.txt b/config/requirements.txt new file mode 100644 index 0000000..189f608 --- /dev/null +++ b/config/requirements.txt @@ -0,0 +1,4 @@ +boto3 +pandas +apache-airflow-providers-amazon +awscli diff --git a/dags/DagFetcher.py b/dags/DagFetcher.py new file mode 100644 index 0000000..1916532 --- /dev/null +++ b/dags/DagFetcher.py @@ -0,0 +1,27 @@ +from airflow.models import DAG +from airflow.operators.bash_operator import BashOperator +from airflow.utils.dates import days_ago +from airflow.models import Variable + +airflow_bucket = Variable.get('airflow_bucket_dags') +airflow_home = Variable.get('airflow_home_dags') + + +args = { + 'owner': 'Airflow', + 'start_date': days_ago(0), + 'depends_on_past': False +} + +folders = ['dags', 'planning', 'quality'] + +with DAG(dag_id='DagFetcher', + default_args=args, + schedule_interval='*/5 * * * *', + tags=['example'], + catchup=False, + is_paused_upon_creation=False) as dag: + + tasks = BashOperator(task_id="folder", + bash_command=f"aws s3 sync {airflow_bucket} {airflow_home} --delete", + dag=dag) diff --git a/dags/my_first_dag.py b/dags/my_first_dag.py deleted file mode 100644 index 54902eb..0000000 --- a/dags/my_first_dag.py +++ /dev/null @@ -1,64 +0,0 @@ -from datetime import datetime -import logging - -from airflow import DAG -from airflow.operators.python_operator import PythonOperator - -import pandas -import toolz - -logger = logging.getLogger(__name__) - -default_args = { - 'owner': 'nicor88', - 'start_date': datetime(2019, 2, 20), - 'depends_on_past': False, - 'provide_context': True -} - -dag = DAG('my_first_dag', - description='My first Airflow DAG', - schedule_interval='*/5 * * * *', - catchup=False, - default_args=default_args) - - -def task_1(**kwargs): - output = {'output': 'hello world 1', 'execution_time': str(datetime.now())} - logger.info(output) - logger.info(f'Pandas version: {pandas.__version__}') - logger.info(f'Toolz version: {toolz.__version__}') - return output - - -def task_2(**kwargs): - ti = kwargs['ti'] - output_task_1 = ti.xcom_pull(key='return_value', task_ids='task_1') - logger.info(output_task_1) - return {'output': 'hello world 2', 'execution_time': str(datetime.now())} - - -def task_3(**kwargs): - logger.info('Log from task 3') - return {'output': 'hello world 3', 'execution_time': str(datetime.now())} - - -t1 = PythonOperator( - task_id='task_1', - dag=dag, - python_callable=task_1 -) - -t2 = PythonOperator( - task_id='task_2', - dag=dag, - python_callable=task_2 -) - -t3 = PythonOperator( - task_id='task_3', - dag=dag, - python_callable=task_3 -) - -t1 >> [t2, t3] diff --git a/dags/my_second_dag.py b/dags/my_second_dag.py deleted file mode 100644 index e5fa721..0000000 --- a/dags/my_second_dag.py +++ /dev/null @@ -1,52 +0,0 @@ -from datetime import datetime -import logging - -from airflow import DAG -from airflow.operators.python_operator import PythonOperator - -import pandas -import toolz - -logger = logging.getLogger(__name__) - -default_args = { - 'owner': 'nicor88', - 'start_date': datetime(2019, 2, 20), - 'depends_on_past': False, - 'provide_context': True -} - -dag = DAG('my_second_dag', - description='My second Airflow DAG', - schedule_interval=None, - catchup=False, - default_args=default_args) - - -def task_1(**kwargs): - output = {'output': 'hello world 1', 'execution_time': str(datetime.now())} - logger.info(output) - logger.info(f'Pandas version: {pandas.__version__}') - logger.info(f'Toolz version: {toolz.__version__}') - return output - - -def task_2(**kwargs): - ti = kwargs['ti'] - output_task_1 = ti.xcom_pull(key='return_value', task_ids='task_1') - logger.info(output_task_1) - return {'output': 'hello world 2', 'execution_time': str(datetime.now())} - - -t1 = PythonOperator( - task_id='task_1', - dag=dag, - python_callable=task_1 -) -t2 = PythonOperator( - task_id='task_2', - dag=dag, - python_callable=task_2 -) - -t1 >> t2 diff --git a/infrastructure/airflow_flower.tf b/infrastructure/airflow_flower.tf deleted file mode 100644 index 96fc611..0000000 --- a/infrastructure/airflow_flower.tf +++ /dev/null @@ -1,105 +0,0 @@ -resource "aws_security_group" "flower" { - name = "${var.project_name}-${var.stage}-flower-sg" - description = "Allow all inbound traffic for Flower" - vpc_id = aws_vpc.vpc.id - - ingress { - from_port = 5555 - to_port = 5555 - protocol = "tcp" - cidr_blocks = ["0.0.0.0/0"] - } - - egress { - from_port = 0 - to_port = 0 - protocol = "-1" - cidr_blocks = ["0.0.0.0/0"] - } - - tags = { - Name = "${var.project_name}-${var.stage}-flower-sg" - } -} - - -resource "aws_ecs_task_definition" "flower" { - family = "${var.project_name}-${var.stage}-flower" - network_mode = "awsvpc" - execution_role_arn = aws_iam_role.ecs_task_iam_role.arn - requires_compatibilities = ["FARGATE"] - cpu = "512" # the valid CPU amount for 2 GB is from from 256 to 1024 - memory = "1024" - container_definitions = <