Managing Dependencies in Data Pipelines

Often there are complex dependencies in your data pipelines. Workflow systems allow you to describe such dependencies and schedule when pipelines run.

Apache Airflow

Apache Airflow (incubating) is a solution for managing and scheduling data pipelines. Airflow represents data pipelines as directed acyclic graphs (DAGs) of operations, where an edge represents a logical dependency between operations.

Airflow provides tight integration between Databricks and Airflow. The Airflow Databricks integration lets you take advantage of the the optimized Spark engine offered by Databricks with the scheduling features of Airflow.

Install the Airflow Databricks integration

The integration between Airflow and Databricks is available in Airflow version 1.9.0. To install the Airflow Databricks integration, run:

pip install "apache-airflow[databricks]"

To install extras (for example celery, s3, and password), run:

pip install "apache-airflow[databricks, celery, s3, password]"

DatabricksSubmitRunOperator operator

The Airflow Databricks integration provides DatabricksSubmitRunOperator as a node in your DAG of computations. This operator matches the Databricks jobs Runs Submit API endpoint and allows you to programmatically run notebooks and JARs uploaded to S3 or DBFS.

Configure a Databricks connection

To use DatabricksSubmitRunOperator you must provide credentials in the appropriate Airflow connection. By default, if you do not specify the databricks_conn_id parameter to DatabricksSubmitRunOperator, the operator tries to find credentials in the connection with the ID equal to databricks_default.

You can configure Airflow connections through the Airflow web UI as instructed in Managing Connections. For the Databricks connection, set the Host field to the hostname of your Databricks deployment, the Login field to token, the Password field to a Databricks-generated personal access token, and the Extra field to

{"token": "<your personal access token>"}

Example

In this example, we show how to set up a simple Airflow deployment that runs on your local machine and deploys an example DAG named that triggers runs in Databricks.

Initialize Airflow database

Initialize the SQLite database that Airflow uses to track miscellaneous metadata. In a production Airflow deployment, you would configure Airflow with a standard database. To perform the initialization run:

airflow initdb

The SQLite database and default configuration for your Airflow deployment are initialized in ~/airflow.

DAG definition

A DAG definition is a Python file and in this example is named example_databricks_operator.py. The example runs two Databricks jobs with one linear dependency. The first Databricks job triggers a notebook located at /Users/airflow@example.com/PrepareData and the second runs a JAR located at dbfs:/lib/etl-0.1.jar. The example DAG definition constructs two DatabricksSubmitRunOperator tasks and then sets the dependency at the end with the set_dowstream method. A skeleton version of the code looks something like:

notebook_task = DatabricksSubmitRunOperator(
    task_id='notebook_task',
    dag=dag,
    json=notebook_task_params)

spark_jar_task = DatabricksSubmitRunOperator(
    task_id='spark_jar_task',
    dag=dag,
    json=spark_jar_task_params)

notebook_task.set_downstream(spark_jar_task)
Import Airflow and required classes

The top of a DAG definition imports airflow, DAG, and DatabricksSubmitRunOperator:

import airflow

from airflow import DAG
from airflow.contrib.operators.databricks_operator import DatabricksSubmitRunOperator
Configure global arguments

The next section sets default arguments applied to each task in the DAG.

args = {
    'owner': 'airflow',
    'email': ['airflow@example.com'],
    'depends_on_past': False,
    'start_date': airflow.utils.dates.days_ago(0)
}

The two interesting arguments are depends_on_past and start_date. Setting depends_on_past to true signals that a task should not be triggered unless the previous instance of the task completed successfully. The start_date argument determines when the first task instance will be scheduled.

Instantiate the DAG

The DAG instantiation statement gives the DAG a unique ID, attaches the default arguments, and gives it a daily schedule.

dag = DAG(dag_id='example_databricks_operator', default_args=args, schedule_interval='@daily')

The next statement specifies the Spark version, node type, and number of workers in the cluster that will run your tasks. The schema of the specification matches the new_cluster field of the job Runs Submit endpoint.

new_cluster = {
    'spark_version': '4.0.x-scala2.11',
    'node_type_id': 'i3.xlarge',
    'aws_attributes': {
        'availability': 'ON_DEMAND'
    },
    'num_workers': 8
}
Register tasks in DAG

For notebook_task, instantiate DatabricksSubmitRunOperator.

notebook_task_params = {
    'new_cluster': new_cluster,
    'notebook_task': {
    'notebook_path': '/Users/airflow@example.com/PrepareData',
  },
}
# Example of using the JSON parameter to initialize the operator.
notebook_task = DatabricksSubmitRunOperator(
  task_id='notebook_task',
  dag=dag,
  json=notebook_task_params)

In this piece of code, the JSON parameter takes a Python dictionary that matches the Runs Submit endpoint.

For spark_jar_task, which runs a JAR located at dbfs:/lib/etl-0.1.jar, instantiate DatabricksSubmitRunOperator.

# Example of using the named parameters of DatabricksSubmitRunOperator to initialize the operator.
spark_jar_task = DatabricksSubmitRunOperator(
  task_id='spark_jar_task',
  dag=dag,
  new_cluster=new_cluster,
  spark_jar_task={
    'main_class_name': 'com.example.ProcessData'
  },
  libraries=[
    {
      'jar': 'dbfs:/lib/etl-0.1.jar'
    }
  ]
)

To configure spark_jar_task to run downstream, use the set_downstream method on notebook_task to register the dependency.

notebook_task.set_downstream(spark_jar_task)

Notice that in notebook_task we used the json parameter to specify the full specification for the submit run endpoint and that in spark_jar_task we flattened the top level keys of the submit run endpoint into parameters for DatabricksSubmitRunOperator. Although both ways of instantiating the operator are equivalent, the latter method does not allow you to use any new top level fields such as spark_python_task or spark_submit_task. For details, see the DatabricksSubmitRunOperator API.

Install and verify the DAG in Airflow

To install the DAG in Airflow, create the directory ~/airflow/dags and copy the DAG definition file into that directory.

To verify that Airflow has read in the DAG, run the list_dags command:

airflow list_dags

[2017-07-06 10:27:23,868] {__init__.py:57} INFO - Using executor SequentialExecutor
[2017-07-06 10:27:24,238] {models.py:168} INFO - Filling up the DagBag from /Users/<user>/airflow/dags


-------------------------------------------------------------------
DAGS
-------------------------------------------------------------------
example_bash_operator
example_branch_dop_operator_v3
example_branch_operator
example_databricks_operator
...

Visualize the DAG in the Airflow UI

You can visualize the DAG in the Airflow web UI. Run airflow webserver and connect to localhost:8080. Click example_databricks_operator to see many visualizations of your DAG. Here is an example:

../../_images/airflow-tutorial.png

Configure the connection to Airflow

The connection credentials for Databricks aren’t specified in the DAG definition. By default, DatabricksSubmitRunOperator sets the databricks_conn_id parameter to databricks_default, so add a connection through the web UI described in Configure a Databricks connection for the ID databricks_default.

Test each task

To test notebook_task, run airflow test example_databricks_operator notebook_task <YYYY-MM-DD> and for spark_jar_task, run airflow test example_databricks_operator spark_jar_task <YYYY-MM-DD>. To run the DAG on a schedule, you would invoke the scheduler daemon process with the command airflow scheduler.

After starting the scheduler, you should be able to see backfilled runs of your DAG in the web UI.