Spark jobs typically run on clusters of machines. A single machine hosts the "driver" application, which constructs a graph of jobs - e.g., reading data from a source, filtering, transforming, and joining records, and writing results to some sink- and manages execution of those jobs. Spark's fundamental abstraction is the Resilient Distributed Dataset (RDD), which encapsulates distributed reads and modifications of records. While RDDs can be used directly, it is far more common to work with Spark Datasets or Dataframes, which is an API that adds explicit schemas for better performance and the ability to interact with datasets using SQL. The Dataframe's declarative API enables Spark to optimize jobs by analyzing and manipulating an abstract query plan prior to execution.
Collecting Lineage in Spark
Collecting lineage requires hooking into Spark's ListenerBus
in the driver application and
collecting and analyzing execution events as they happen. Both raw RDD and Dataframe jobs post events
to the listener bus during execution. These events expose the structure of the job, including the
optimized query plan, allowing the Spark integration to analyze the job for datasets consumed and
produced, including attributes about the storage, such as location in GCS or S3, table names in a
relational database or warehouse, such as Redshift or Bigquery, and schemas. In addition to dataset
and job lineage, Spark SQL jobs also report logical plans, which can be compared across job runs to
track important changes in query plans, which may affect the correctness or speed of a job.
A single Spark application may execute multiple jobs. The Spark OpenLineage integration maps one
Spark job to a single OpenLineage Job. The application will be assigned a Run id at startup and each
job that executes will report the application's Run id as its parent job run. Thus, an application
that reads one or more source datasets, writes an intermediate dataset, then transforms that
intermediate dataset and writes a final output dataset will report three jobs- the parent application
job, the initial job that reads the sources and creates the intermediate dataset, and the final job
that consumes the intermediate dataset and produces the final output. As an image:
How to Use the Integration
Adding OpenLineage metadata collection to existing Spark jobs was designed to be straightforward
and unobtrusive to the application. The Spark integration works as either a javaagent
or as a
SparkListener
.
SparkListener
The SparkListener
approach is very simple and covers most cases. The listener simply analyzes
events, as they are posted by the SparkContext, and extracts job and dataset metadata that are
exposed by the RDD and Dataframe dependency graphs. Most data sources, such as filesystem sources
(including S3 and GCS), JDBC backends, and warehouses such as Redshift and Bigquery can be analyzed
and reported in this way.
spark-submit
The listener can be enabled by adding the following configuration to a spark-submit
command:
spark-submit --conf "spark.extraListeners=io.openlineage.spark.agent.OpenLineageSparkListener" \
--packages "io.openlineage:openlineage-spark:0.2.+" \
--conf "spark.openlineage.host=http://<your_ol_endpoint>" \
--conf "spark.openlineage.namespace=my_job_namespace" \
--class com.mycompany.MySparkApp my_application.jar
Additional configuration can be set if applicable
Configuration Key | Description | Default |
---|---|---|
spark.openlineage.parentJobName | The job name of the parent job that triggered this Spark application | |
spark.openlineage.parentRunId | The RunId of the parent job Run that triggered this Spark application | |
spark.openlineage.apiKey | The API Key used to authenticate with the OpenLineage server that collects events | |
spark.openlineage.version | The API version of the OpenLineage specification | 1 |
spark-defaults.conf
Alternatively, the same configuration parameters can be added to the spark-defaults.conf
file on
cluster creation. Add the following key/value parameters to the spark-defaults.conf
file:
spark.jars.packages io.openlineage:openlineage-spark:0.2.+
spark.extraListeners io.openlineage.spark.agent.OpenLineageSparkListener
spark.openlineage.host http://<your_ol_endpoint>
spark.openlineage.namespace my_job_namespace
The optional keys defined above can also be added here. When the job is submitted, additional or
overriding configuration values can be supplied. E.g., the spark.openlineage.host
and spark.openlineage.namespace
can be defined in the spark-defaults.conf
file and the spark.openlineage.parentRunId
and spark.openlineage.parentJobName
configuration can be supplied when the job is submitted by the parent job.
Javaagent
The Javaagent approach is the earliest approach to adding lineage events. It was aimed to support
instrumenting Spark code directly by manipulating bytecode at runtime. In the case of Dataframe or
RDD code that doesn't expose the underlying datasource directly, the javaagent approach will allow
injecting bytecode at runtime to expose the required information. This approach requires being able
to add the openlineage-spark
jar on the driver host and adding the correct JVM startup parameters. This
may not be possible, e.g., on a serverless Spark platform, such as AWS Glue.
spark-submit
The following configuration must be added to the spark-submit
command when the job is submitted:
spark-submit --conf spark.driver.extraJavaOptions=-javaagent:<openlineage-spark-jar-location>=http://<your_ol_endpoint>/api/v1/namespaces/<your_job_namespace>/?api_key=<optional_api_key>
If a parent job run is triggering the Spark job run, the parent job's name and Run id can be included as such:
spark-submit --conf spark.driver.extraJavaOptions=-javaagent:<openlineage-spark-jar-location>=http://<your_ol_endpoint>/api/v1/namespaces/<your_job_namespace>/jobs/<parent_job_name>/runs/<parent_run_id>?api_key=<optional_api_key>
From Airflow
The same parameters passed to spark-submit
can be supplied from Airflow and other schedulers. If
using the openlineage-airflow integration, each task in the DAG has its own Run id
which can be connected to the Spark job run via the spark.openlineage.parentRunId
parameter. For example,
here is an example of a DataProcPySparkOperator
that submits a Pyspark application on Dataproc:
t1 = DataProcPySparkOperator(
task_id=job_name,
gcp_conn_id='google_cloud_default',
project_id='project_id',
cluster_name='cluster-name',
region='us-west1',
main='gs://bucket/your-prog.py',
job_name=job_name,
dataproc_pyspark_properties={
"spark.extraListeners": "io.openlineage.spark.agent.OpenLineageSparkListener",
"spark.jars.packages": "io.openlineage:openlineage-spark:0.2.+",
"spark.openlineage.url": f"{openlineage_url}/api/v1/namespaces/{openlineage_namespace}/jobs/dump_orders_to_gcs/runs/{{{{task_run_id(run_id, task)}}}}?api_key={api_key}"
},
dag=dag)
The same job can be submitted using the javaagent
approach:
t1 = DataProcPySparkOperator(
task_id=job_name,
gcp_conn_id='google_cloud_default',
project_id='project_id',
cluster_name='cluster-name',
region='us-west1',
main='gs://bucket/your-prog.py',
job_name=job_name,
dataproc_pyspark_properties={
'spark.driver.extraJavaOptions':
f"-javaagent:openlineage-spark-0.2.2.jar={openlineage_url}/api/v1/namespaces/{openlineage_namespace}/jobs/dump_orders_to_gcs/runs/{{{{task_run_id(run_id, task)}}}}?api_key={api_key}"
files="https://repo1.maven.org/maven2/io/openlineage/openlineage-spark/0.2.2/openlineage-spark-0.2.2.jar",
dag=dag)