Quickstart with Databricks
OpenLineage's Spark Integration can be installed on Databricks leveraging init
scripts. Please note, Databricks on Google Cloud does not currently support the DBFS CLI, so the proposed solution will not work on Google Cloud until that feature is enabled.
Enable OpenLineage
Follow the steps below to enable OpenLineage on Databricks.
- Build the jar via Gradle or download the latest release.
- Configure the Databricks CLI with your desired workspace:
- Run
upload-to-databricks.sh
orupload-to-databricks.ps1
. This will:- create a folder in DBFS to store the OpenLineage jar.
- copy the jar to the DBFS folder
- copy the
init
script to the DBFS folder
- Create an interactive or job cluster with the relevant Spark configs:
spark.openlineage.transport.type console
spark.extraListeners io.openlineage.spark.agent.OpenLineageSparkListener
spark.openlineage.version v1 - Create manually
open-lineage-init-script.sh
through Workspace section in Databricks UI. Paste the script content from this file. - Make the cluster init script to point to previously created file. For example, if you create
open-lineage-init-script.sh
within Shared, then init scripts should point to/Shared/open-lineage-init-script.sh
. User's workspace may be used as well. Alternatively, init script can be located in S3. Please mind that DBFS located init script are no longer supported (starting September 2023).
Please note that the init
script approach is currently obligatory to install OpenLineage on Databricks. The Openlineage integration relies on providing a custom extra listener class io.openlineage.spark.agent.OpenLineageSparkListener
that has to be available on the classpath at the driver startup. Providing it with spark.jars.packages
does not work on the Databricks platform as of August 2022.
Verify Initialization
A successful initialization will emit logs in the Log4j output
that look similar to the following:
YY/MM/DD HH:mm:ss INFO SparkContext: Registered listener io.openlineage.spark.agent.OpenLineageSparkListener
YY/MM/DD HH:mm:ss INFO OpenLineageContext: Init OpenLineageContext: Args: ArgumentParser(host=https://YOURHOST, version=v1, namespace=YOURNAMESPACE, jobName=default, parentRunId=null, apiKey=Optional.empty) URI: https://YOURHOST/api/v1/lineage
YY/MM/DD HH:mm:ss INFO AsyncEventQueue: Process of event SparkListenerApplicationStart(Databricks Shell,Some(app-XXX-0000),YYYY,root,None,None,None) by listener OpenLineageSparkListener took Xs.
Create a Dataset
Open a notebook and create an example dataset with:
spark.createDataFrame([
{'a': 1, 'b': 2},
{'a': 3, 'b': 4}
]).write.mode("overwrite").saveAsTable("default.temp")
Observe OpenLineage Events
To troubleshoot or observe OpenLineage information in Databricks, see the Log4j output
in the Cluster definition's Driver Logs
.
The Log4j output
should contain entries starting with a message INFO ConsoleTransport
that contain generated OpenLineage events:
{"eventType":"COMPLETE","eventTime":"2022-08-01T08:36:21.633Z","run":{"runId":"64537bbd-00ac-498d-ad49-1c77e9c2aabd","facets":{"spark_unknown":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/0.12.0-SNAPSHOT/integration/spark","_schemaURL":"https://openlineage.io/spec/1-0-2/OpenLineage.json#/$defs/RunFacet","inputs":[{"description":{"@class":"org.apache.spark.sql.catalyst.analysis.ResolvedTableName","id":1,"traceEnabled":false,"streaming":false,"cacheId":{"id":2,"empty":true,"defined":false},"canonicalizedPlan":false,"defaultTreePatternBits":{"id":3}},"inputAttributes":[],"outputAttributes":[]},{"description":{"@class":"org.apache.spark.sql.execution.LogicalRDD","id":1,"streaming":false,"traceEnabled":false,"cacheId":{"id":2,"empty":true,"defined":false},"canonicalizedPlan":false,"defaultTreePatternBits":{"id":3}},"inputAttributes":[],"outputAttributes":[{"name":"a","type":"long","metadata":{}},{"name":"b","type":"long","metadata":{}}]}]},"spark.logicalPlan":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/0.12.0-SNAPSHOT/integration/spark","_schemaURL":"https://openlineage.io/spec/1-0-2/OpenLineage.json#/$defs/RunFacet","plan":[{"class":"org.apache.spark.sql.catalyst.plans.logical.ReplaceTableAsSelect","num-children":2,"name":0,"partitioning":[],"query":1,"tableSpec":null,"writeOptions":null,"orCreate":true},{"class":"org.apache.spark.sql.catalyst.analysis.ResolvedTableName","num-children":0,"catalog":null,"ident":null},{"class":"org.apache.spark.sql.execution.LogicalRDD","num-children":0,"output":[[{"class":"org.apache.spark.sql.catalyst.expressions.AttributeReference","num-children":0,"name":"a","dataType":"long","nullable":true,"metadata":{},"exprId":{"product-class":"org.apache.spark.sql.catalyst.expressions.ExprId","id":18,"jvmId":"481bebf6-f861-400e-bb00-ea105ed8afef"},"qualifier":[]}],[{"class":"org.apache.spark.sql.catalyst.expressions.AttributeReference","num-children":0,"name":"b","dataType":"long","nullable":true,"metadata":{},"exprId":{"product-class":"org.apache.spark.sql.catalyst.expressions.ExprId","id":19,"jvmId":"481bebf6-f861-400e-bb00-ea105ed8afef"},"qualifier":[]}]],"rdd":null,"outputPartitioning":{"product-class":"org.apache.spark.sql.catalyst.plans.physical.UnknownPartitioning","numPartitions":0},"outputOrdering":[],"isStreaming":false,"session":null}]},"spark_version":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/0.12.0-SNAPSHOT/integration/spark","_schemaURL":"https://openlineage.io/spec/1-0-2/OpenLineage.json#/$defs/RunFacet","spark-version":"3.2.1","openlineage-spark-version":"0.12.0-SNAPSHOT"}}},"job":{"namespace":"default","name":"databricks_shell.atomic_replace_table_as_select","facets":{}},"inputs":[],"outputs":[{"namespace":"dbfs","name":"/user/hive/warehouse/temp","facets":{"dataSource":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/0.12.0-SNAPSHOT/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/1-0-0/DatasourceDatasetFacet.json#/$defs/DatasourceDatasetFacet","name":"dbfs","uri":"dbfs"},"schema":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/0.12.0-SNAPSHOT/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/1-0-0/SchemaDatasetFacet.json#/$defs/SchemaDatasetFacet","fields":[{"name":"a","type":"long"},{"name":"b","type":"long"}]},"storage":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/0.12.0-SNAPSHOT/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/1-0-0/StorageDatasetFacet.json#/$defs/StorageDatasetFacet","storageLayer":"delta","fileFormat":"parquet"},"lifecycleStateChange":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/0.12.0-SNAPSHOT/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/1-0-0/LifecycleStateChangeDatasetFacet.json#/$defs/LifecycleStateChangeDatasetFacet","lifecycleStateChange":"OVERWRITE"}},"outputFacets":{}}],"producer":"https://github.com/OpenLineage/OpenLineage/tree/0.12.0-SNAPSHOT/integration/spark","schemaURL":"https://openlineage.io/spec/1-0-2/OpenLineage.json#/$defs/RunEvent"}
The generated JSON contains the output dataset name and location {"namespace":"dbfs","name":"/user/hive/warehouse/temp""
metadata, schema fields [{"name":"a","type":"long"},{"name":"b","type":"long"}]
, and more.