Skip to main content

Capturing dataset statistics in Apache Spark

· 4 min read
Paweł Leszczyński
OpenLineage Committer

OpenLineage events enable the creation of a consistent lineage graph, where dataset vertices are connected through the jobs that read from and write to them. This graph becomes even more valuable when its nodes and edges are enriched with additional metadata for practical use cases. One important aspect to capture is the amount of data processed, as it facilitates applications such as cost estimation and data quality monitoring, among others. In this post, we introduce recent developments in Spark dataset statistics collection and reporting within OpenLineage events. We outline the basic statistics included, as well as the detailed scan and commit reports generated when processing Iceberg datasets.

Input and output facets

OpenLineage events are made of three main building blocks: runs, jobs, and datasets. Metadata about processed data presents an interesting challenge: it is both a property of a run and inherently tied to the context of a specific dataset. On one hand, it is a property of a run. On the other hand, it makes sense only in the context of a given dataset. For this purpose, inputFacets and outputFacets were attached to datasets, although logically they describe dataset in the context of a given run.

Basic input and output statistics

OpenLineage spec has already contained for some time OutputStatisticsDatasetFacet. Recently, we added InputStatisticsDatasetFacet. Both facets contain basic statistics like number of rows processed, size in bytes, and file count.

We can demonstrate them in action by running Jupyter quickstart demo which emit OpenLineage events to docker logs. We start Spark session with:

from pyspark.sql import SparkSession

spark = (SparkSession.builder.master('local')
.appName('sample_spark')
.config('spark.jars.packages', 'io.openlineage:openlineage-spark_2.12:1.26.0,org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.7.1')
.config('spark.extraListeners', 'io.openlineage.spark.agent.OpenLineageSparkListener')
.config('spark.openlineage.transport.type', 'console')
.config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.spark_catalog.type", "hadoop")
.config("spark.sql.catalog.spark_catalog.warehouse", "/tmp/iceberg")
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
.getOrCreate())
spark.sparkContext.setLogLevel("INFO")

load example flights dataset to a dataframe:

!wget https://github.com/plotly/datasets/raw/refs/heads/master/2015_flights.parquet
flights = spark.read.parquet("2015_flights.parquet")

When running a simple query on the dataset

flights \
.filter("distance < 150") \
.write \
.mode("overwrite") \
.saveAsTable("short_flights")

we can see the input statistics facet:

{
"inputStatistics": {
"_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.26.0-SNAPSHOT/integration/spark",
"_schemaURL": "https://openlineage.io/spec/facets/1-0-0/InputStatisticsInputDatasetFacet.json#/$defs/InputStatisticsInputDatasetFacet",
"size": 25238218,
"fileCount": 1
}
}

and the output statistics facet:

{
"outputStatistics": {
"_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.26.0-SNAPSHOT/integration/spark",
"_schemaURL": "https://openlineage.io/spec/facets/1-0-2/OutputStatisticsOutputDatasetFacet.json#/$defs/OutputStatisticsOutputDatasetFacet",
"rowCount": 214307,
"size": 878649,
"fileCount": 1
}
}

A limitation of this approach is that inputs' row count is collected for Spark V2 datasets only.

Iceberg metrics reported with OpenLineage

A much more detailed report is collected when processing Iceberg datasets. Iceberg provides extensive statistics about datasets through Metrics API. When a MetricsReporter is configured, it gets notified with scan and commit metrics. By default, a LoggingMetricsReporter is configured, but the API allows to implement custom reporters. This is what OpenLineage Spark agent does. It instantiates OpenLineageMetricsReporter and injects it into Iceberg catalog.

Then, statistics sent into OpenLineageMetricsReporter are collected and sent with OpenLineage events. This is possible as each report contains a snapshot id which is also contained as dataset version within OpenLineage events. As a result, we can correlate Iceberg reports with OpenLineage datasets, and we are able to attach IcebergScanReportInputDatasetFacet and IcebergCommitReportInputDatasetFacet. Iceberg dataset facets are modelled after Iceberg metric result interfaces.

We can run some operation on Iceberg datasets:

flights.write.mode("overwrite").format("iceberg").saveAsTable("iceberg_flights")
spark \
.read \
.table("iceberg_flights") \
.filter("distance < 150") \
.write \
.format("iceberg") \
.mode("overwrite") \
.saveAsTable("short_flights")

and OpenLineage events will contain scan report:

{
"_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.26.0-SNAPSHOT/integration/spark",
"_schemaURL": "https://openlineage.io/spec/facets/1-0-0/IcebergScanReportInputDatasetFacet.json",
"snapshotId": 4808226079342265000,
"filterDescription": "(not_null(ref(name=\"DISTANCE\")) and ref(name=\"DISTANCE\") < \"(3-digit-int)\")",
"projectedFieldNames": [
"DEPARTURE_DELAY",
"ARRIVAL_DELAY",
"DISTANCE",
"SCHEDULED_DEPARTURE"
],
"scanMetrics": {
"totalPlanningDuration": 32,
"resultDataFiles": 1,
"resultDeleteFiles": 0,
"totalDataManifests": 1,
"totalDeleteManifests": 0,
"scannedDataManifests": 1,
"skippedDataManifests": 0,
"totalFileSizeInBytes": 21970429,
"totalDeleteFileSizeInBytes": 0,
"skippedDataFiles": 0,
"skippedDeleteFiles": 0,
"scannedDeleteManifests": 0,
"skippedDeleteManifests": 0,
"indexedDeleteFiles": 0,
"equalityDeleteFiles": 0,
"positionalDeleteFiles": 0
},
"metadata": {
"engine-version": "3.5.0",
"iceberg-version": "Apache Iceberg 1.7.1 (commit 4a432839233f2343a9eae8255532f911f06358ef)",
"app-id": "local-1734096828109",
"engine-name": "spark"
}
}

Commit report will be reported in future Iceberg versions due to a recently resolved issue https://github.com/apache/iceberg/issues/11664 which affects commit metrics for CTAS queries.

However, this can be still tested for append operations:

spark \
.read \
.table("iceberg_flights") \
.filter("distance >= 150 AND distance < 160") \
.write \
.insertInto("short_flights")

which results in a facet:

{
"_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.26.0-SNAPSHOT/integration/spark",
"_schemaURL": "https://openlineage.io/spec/facets/1-0-0/IcebergCommitReportOutputDatasetFacet.json",
"operation": "append",
"commitMetrics": {
"totalDuration": 64,
"attempts": 1,
"addedDataFiles": 1,
"totalDataFiles": 2,
"totalDeleteFiles": 0,
"addedRecords": 37673,
"totalRecords": 251980,
"addedFilesSizeInBytes": 105467,
"totalFilesSizeInBytes": 802832,
"totalPositionalDeletes": 0,
"totalEqualityDeletes": 0
},
"snapshotId": 586841490378901100,
"metadata": {
"engine-version": "3.5.0",
"app-id": "local-1734527649353",
"engine-name": "spark",
"iceberg-version": "Apache Iceberg 1.7.1 (commit 4a432839233f2343a9eae8255532f911f06358ef)"
},
"sequenceNumber": 2
}

Conclusion

When reading a gargantuan dataset, scan metrics provide details on the amount of data processed. The same happens for commit metrics and writes. These enhancements provide a more detailed view of Spark jobs to help you cost-effectively manage your data processing pipelines. The OpenLineage project is continuously evolving, and we are excited to see how these new features will be used in practice.