Skip to main content
Version: Next

Scheduling from Airflow

The same parameters that are passed to spark-submit can also be supplied directly from Airflow and other schedulers, allowing for seamless configuration and execution of Spark jobs.

When using the OpenLineage Airflow integration with operators that submit Spark jobs, the entire Spark OpenLineage integration can be configured directly within Airflow.

Automatic Injection

There are several operators that are used to submit Spark jobs that in their newest versions have the ability to automatically inject the OpenLineage Spark integration into the Spark job.

There are two types of configuration that can be automatically injected: parent job info (see Preserving Job Hierarchy) and transport info - that enables you to pass the same transport configuration from Airflow to the Spark job.

To enable configuring parent job info, Airflow configuration spark_inject_parent_job_info must be set to true.

To enable configuring transport information, Airflow configuration spark_inject_transport_info must be set to true.

The following operators are supported:

This list is non-exhaustive, please check the documentation of the operator you are using to see if it supports automatic injection.

Preserving Job Hierarchy

To establish a correct job hierarchy in lineage tracking, the Spark application and lineage backend require identifiers of the parent job that triggered the Spark job. These identifiers allow the Spark integration to automatically add a ParentRunFacet to the application-level OpenLineage event, facilitating the linkage of the Spark job to its originating (Airflow) job in the lineage graph.

The following properties are necessary for the automatic creation of the ParentRunFacet:

  • spark.openlineage.parentJobNamespace
  • spark.openlineage.parentJobName
  • spark.openlineage.parentRunId

Additionally, in version 1.31.0 and later, the following properties are also added to ParentRunFacet that allow easier connection of the root (top-level parent) job to the children jobs:

  • spark.openlineage.parentJobNamespace
  • spark.openlineage.parentJobName
  • spark.openlineage.parentRunId

Refer to the Spark Configuration documentation for more information on these properties.

OpenLineage Airflow integration provides powerful macros that can be used to dynamically generate these identifiers.

Example

Below is an example of a DataprocSubmitJobOperator that submits a PySpark application to Dataproc cluster:

t1 = DataprocSubmitJobOperator(
task_id="task_id",
project_id="project_id",
region='eu-central2',
job={
"reference": {"project_id": "project_id"},
"placement": {"cluster_name": "cluster_name"},
"pyspark_job": {
"main_python_file_uri": "gs://bucket/your-prog.py",
"properties": {
"spark.extraListeners": "io.openlineage.spark.agent.OpenLineageSparkListener",
"spark.jars.packages": "io.openlineage:openlineage-spark_${SCALA_BINARY_VERSION}:1.31.0",
"spark.openlineage.transport.url": openlineage_url,
"spark.openlineage.transport.auth.apiKey": api_key,
"spark.openlineage.transport.auth.type": "apiKey",
"spark.openlineage.namespace": openlineage_spark_namespace,
"spark.openlineage.parentJobNamespace": "{{ macros.OpenLineageProviderPlugin.lineage_job_namespace() }}",
"spark.openlineage.parentJobName": "{{ macros.OpenLineageProviderPlugin.lineage_job_name(task_instance) }}",
"spark.openlineage.parentRunId": "{{ macros.OpenLineageProviderPlugin.lineage_run_id(task_instance) }}",
}
},
},
dag=dag
)