Skip to main content
Version: Next

Scheduling from Airflow

The same parameters that are passed to spark-submit can also be supplied directly from Airflow and other schedulers, allowing for seamless configuration and execution of Spark jobs.

When using the OpenLineage Airflow integration with operators that submit Spark jobs, the entire Spark OpenLineage integration can be configured directly within Airflow.

Preserving Job Hierarchy

To establish a correct job hierarchy in lineage tracking, the Spark application and lineage backend require identifiers of the parent job that triggered the Spark job. These identifiers allow the Spark integration to automatically add a parentRunFacet to the application-level OpenLineage event, facilitating the linkage of the Spark job to its originating (Airflow) job in the lineage graph.

The following properties are necessary for the automatic creation of the parentRunFacet:

  • spark.openlineage.parentJobNamespace
  • spark.openlineage.parentJobName
  • spark.openlineage.parentRunId

Refer to the Spark Configuration documentation for more information on these properties.

OpenLineage Airflow integration provides powerful macros that can be used to dynamically generate these identifiers.

Example

Below is an example of a DataprocSubmitJobOperator that submits a PySpark application to Dataproc cluster:

t1 = DataprocSubmitJobOperator(
task_id="task_id",
project_id="project_id",
region='eu-central2',
job={
"reference": {"project_id": "project_id"},
"placement": {"cluster_name": "cluster_name"},
"pyspark_job": {
"main_python_file_uri": "gs://bucket/your-prog.py",
"properties": {
"spark.extraListeners": "io.openlineage.spark.agent.OpenLineageSparkListener",
"spark.jars.packages": "io.openlineage:openlineage-spark_${SCALA_BINARY_VERSION}:1.26.0",
"spark.openlineage.transport.url": openlineage_url,
"spark.openlineage.transport.auth.apiKey": api_key,
"spark.openlineage.transport.auth.type": "apiKey",
"spark.openlineage.namespace": openlineage_spark_namespace,
"spark.openlineage.parentJobNamespace": "{{ macros.OpenLineageProviderPlugin.lineage_job_namespace() }}",
"spark.openlineage.parentJobName": "{{ macros.OpenLineageProviderPlugin.lineage_job_name(task_instance) }}",
"spark.openlineage.parentRunId": "{{ macros.OpenLineageProviderPlugin.lineage_run_id(task_instance) }}",
}
},
},
dag=dag
)