What is Apache Airflow?
Airflow is a platform to programmatically author, schedule & monitor workflows or data pipelines. These functions achieved with Directed Acyclic Graphs (DAG) of the tasks. It is an open-source and still in the incubator stage. It was initialized in 2014 under the umbrella of Airbnb since then it got an excellent reputation with approximately 800 contributors on GitHub and 13000 stars. The main functions of Apache Airflow is to schedule workflow, monitor and author.
Apache Airflow as a Solution
- Failures: retry if failure happens
- Monitoring: success or failure status
- Dependencies:
- Data Dependencies – upstream data is missing
- Execution Dependency – job 2 runs after job 1 is finished
- Scalability: no centralized scheduler b/w diff. cron machines
- Deployment: deploy new changes constantly
- Process Historic Data: backfill/rerun historical data
- Data Dependencies – upstream data is missing
- Execution Dependency – job 2 runs after job 1 is finished
How Apache Airflow Works?
- Apache Airflow achieves the errands by taking DAG(Directed Acyclic Graphs) as an array of the workers, a portion of these workers have particularized possibilities.
- It brings about the development of DAG in Python itself which make these DAGs utilized effectively further for different procedures.
- These outcomes in the changing of a workflow into a well-characterized code which further makes a work process testable, maintainable, Co-employable and versionable.
- During the majority of the above methods assignments not allowed to trade the information, however with this reality, it is likewise obvious that metadata transfer. It’s not considered as a streaming solution concerning information.
- The working procedure of Apache Airflow isn’t probably going to be comparable with “Spark Streaming” or “Storm” space. In any case, it tends to be taken as like the Oozie.
Why Apache Airflow?
Apache Airflow
- The logs entries of execution gathered at one area.
- The utilization of Airflow matters as it has solidarity to mechanize the improvement of the workflow as it has a way to deal with arrange the workflow as a code.
- It can give a reporting message through slack if an error comes due to failure of DAG
- Inside the DAGs, it gives a clarion image of the dependencies.
- The capacity to produce the metadata gives an edge of regenerating uploads.
Principles
Dynamic: Airflow pipelines are configuration as code (Python), allowing for dynamic pipeline generation. This allows for writing code that instantiates pipelines dynamically.
Extensible: Easily define your own operators, executors and extend the library so that it fits the level of abstraction that suits your environment.
Elegant: Airflow pipelines are lean and explicit. Parameterizing your scripts is built into the core of Airflow using the powerful Jinja templating engine.
Scalable: Airflow has a modular architecture and uses a message queue to orchestrate an arbitrary number of workers. Airflow is ready to scale to infinity.
Basic concepts –
DAG: a description of the order in which work should take place
Operator: a class that acts as a template for carrying out some work
Task: a parameterized instance of an operator
Task Instance: a task that has been assigned to a DAG and has a state associated with a specific run of the DAG
How to use Airflow?
- First of all, set up all the configurations in airflow.cfg
- Initialize a Database
- Use operators such as BashOperators, Dingding operators, Google Cloud Operators, Python Operators etc.
- Manage connections to connect with your environment such as AWS, GCP, MySql, PostgreSql SSH connection etc.
Connections can be made following the below steps:
Connections can be made following the below steps:
A simple workflow
Let’s see how we can implement a simple pipeline composed of two tasks.
The first task generates a .txt file with a word (“pipeline” in this case), a second task reads the file and decorate the line adding.
First, we define and initialise the DAG, then we will add two operators to the DAG.
The first one is a BashOperator
which can basically run every bash command or script, the second one is an PythonOperator
executing python code.
import os
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
OUTPUT_ONE_PATH = "data/output_one.txt")
OUTPUT_TWO_PATH = "data/output_two.txt")
def decorate_file(input_path, output_path):
with open(input_path, "r") as in_file:
line = in_file.read()
with open(output_path, "w") as out_file:
out_file.write("My "+line)
default_args = {
"owner": "lorenzo",
"depends_on_past": False,
"start_date": datetime(2018, 9, 12),
"email": ["l.peppoloni@gmail.com"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 0,
}
//DAG
dag = DAG(
"simple_dag",
default_args=default_args,
schedule_interval="0 12 * * *",
)
//defining task
t1 = BashOperator(
task_id="print_file",
bash_command='echo "pipeline" > {}'.format(OUTPUT_ ONE_PATH),
dag=dag)
t2 = PythonOperator(
task_id="decorate_file",
python_callable=decorate_file,
op_kwargs={"input_path": OUTPUT_ONE_PATH, "output _path": OUTPUT_TWO_PATH},
dag=dag)
//task upstream
t1 >> t2
This is how a workflow can be designed using different operators & defining the task.
Source:https://blog.knoldus.com/defining-your-workflow-why-not-airflow/amp/
The first task generates a .txt file with a word (“pipeline” in this case), a second task reads the file and decorate the line adding.
The first one is a
BashOperator
which can basically run every bash command or script, the second one is an PythonOperator
executing python code. from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
OUTPUT_ONE_PATH = "data/output_one.txt")
OUTPUT_TWO_PATH = "data/output_two.txt")
def decorate_file(input_path, output_path):
with open(input_path, "r") as in_file:
line = in_file.read()
with open(output_path, "w") as out_file:
out_file.write("My "+line)
default_args = {
"owner": "lorenzo",
"depends_on_past": False,
"start_date": datetime(2018, 9, 12),
"email": ["l.peppoloni@gmail.com"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 0,
}
//DAG
dag = DAG(
"simple_dag",
default_args=default_args,
schedule_interval="0 12 * * *",
)
//defining task
t1 = BashOperator(
task_id="print_file",
bash_command='echo "pipeline" > {}'.format(OUTPUT_ ONE_PATH),
dag=dag)
t2 = PythonOperator(
task_id="decorate_file",
python_callable=decorate_file,
op_kwargs={"input_path": OUTPUT_ONE_PATH, "output _path": OUTPUT_TWO_PATH},
dag=dag)
//task upstream
t1 >> t2
No comments:
Post a Comment