The OpenLineage Airflow Provider is Here
This one is big. With the release of Airflow 2.7.0, the Airflow integration is now officially an Airflow Provider. This means the openlineage-airflow
package is now apache-airflow-providers-openlineage
in Airflow itself – a built-in feature of Airflow rather than an externally managed integration. Why does it matter where the integration “lives”? The short answer: as an Airflow Provider, the integration will offer improved reliability, broader support for operators, enhanced lineage, and easier implementation in custom operators going forward.
Although still a work in progress in some key respects, the OpenLineage Provider promises to pay dividends to users and contributors alike while accelerating the growth of the OpenLineage Ecosystem.
Critical Improvements
Before 2.7.0, OpenLineage metadata was only available via a plugin implementation maintained in the OpenLineage project. In other words, the integration was an external package getting lineage from the outside. Being external to Airflow, the integration had to use extractors to get lineage – special classes created for all supported operators. In order to function, these locally maintained extractors had to understand operators’ internals and know where to look for data. While being the best possible approach under the circumstances, this solution was hardly ideal. On the one hand, it was brittle because it depended on both operators’ and Airflow’s internals. On the other, it required extra work to maintain compatibility with new versions of providers and Airflow itself. We had to keep up with changes to not only operators but also Airflow – which is not exactly a small, slowly-moving project.
Improvements coming with the provider are not limited to fixes, however. The OpenLineage Provider promises to enable some long-sought-after enhancements, including support for one of the most-used Airflow operators – more about which below.
High-level Design
The provider approach solves these maintenance and reliability issues by moving the extraction logic, along with unit tests, to each provider. Although a lot of up-front work has gone into creating the provider, full implementation of this solution has actually been distributed (and necessarily remains a work in progress). No longer self-contained, the integration is now part of the operator contract and belongs to every provider that supports OpenLineage. Relocating the extraction logic in this way makes the integration more robust by ensuring the stability of the lineage contract in each operator. Another benefit of the approach: adding lineage coverage to custom operators is now easier.
Implementation
The OpenLineage Provider has been implemented in Airflow by reimplementing the openlineage-airflow
package from the OpenLineage project in the apache-airflow-providers-openlineage
provider in the base Airflow Docker image, where it can be easily enabled by configuration. Furthermore, lineage extraction logic that was included in Extractors in that package is now implemented in operators living in their provider package along with unit tests, eliminating the need for Extractors in most cases. For this purpose, a new optional API for Operators (get_openlineage_facets_on_{start(), complete(ti), failure(ti)}
, documented here) can be used.
Example Operator
The Google Cloud Provider in Airflow is one of the providers to which extraction logic has been added. The get_openlineage_facets_on_complete()
function in the gcs_to_gcs
operator shows how easy adding OpenLineage support to an operator can be.
def get_openlineage_facets_on_complete(self, task_instance):
"""
Implementing _on_complete because the execute method does preprocessing on internals.
This means we won't have to normalize self.source_object and self.source_objects,
destination bucket and so on.
"""
from openlineage.client.run import Dataset
from airflow.providers.openlineage.extractors import OperatorLineage
return OperatorLineage(
inputs=[
Dataset(namespace=f"gs://{self.source_bucket}", name=source)
for source in sorted(self.resolved_source_objects)
],
outputs=[
Dataset(namespace=f"gs://{self.destination_bucket}", name=target)
for target in sorted(self.resolved_target_objects)
],
)
In this case, the operator itself presents us with the source and target buckets, and objects which will be copied. Implementing OpenLineage support requires only properly initializing the name and namespace of the object according to the naming schema
Implementing the Provider in Custom Operators
The OpenLineage Provider in Airflow makes implementing support for custom operators easy. In fact, now there is nothing stopping you from adding OpenLineage support to your own custom operator. The provider detects OpenLineage methods and calls them when appropriate – before task execution, after success, or after complete. Also, you don’t have to add all three – the failure method falls back to the complete method if it’s not present, and the complete method to the start method.
Future Development
The OpenLineage Provider makes possible several sought-after enhancements, including:
- Integration with XCom datasets (Airflow AIP-48)
- Coverage of
PythonOperator
, the most-used operator in Airflow, including Task Flow support - Support for Hooks, which would track their own lineage to be collected by the
PythonOperator
and presented as its own lineage
Supported Operators
The OpenLineage Provider currently supports the following operators, with support for additional operators coming soon:
- Apache Kafka
- AWS SageMaker
- GCS
- Common-SQL, including support for multiple databases like Postgres and MySQL
- MS Azure
- Snowflake
We welcome contributions and feedback on operator support and will be happy to help anyone get started adding extraction logic to an existing or custom operator.
Additional Resources
If you are interested in participating in the effort to add support for more operators, reach out to us on Slack.
For background on the architecture and implementation plan, read the proposal.
For guides on getting started with OpenLineage, read the docs.