Exposing Lineage in Airflow Operators
This page is about Airflow's external integration that works mainly for Airflow versions <2.7.
If you're using Airflow 2.7+, look at native Airflow OpenLineage provider documentation.
The ongoing development and enhancements will be focused on the apache-airflow-providers-openlineage
package,
while the openlineage-airflow
will primarily be updated for bug fixes. See all Airflow versions supported by this integration
OpenLineage 0.17.0+ makes adding lineage to your data pipelines easy through support of direct modification of Airflow operators. This means that custom operators—built in-house or forked from another project—can provide you and your team with lineage data without requiring modification of the OpenLineage project. The data will still go to your lineage backend of choice, most commonly using the OPENLINEAGE_URL
environment variable.
Lineage extraction works a bit differently under the hood starting with OpenLineage 0.17.0. While extractors in the OpenLineage project have a getter method for operator names that they’re associated with, the default extractor looks for two specific methods in the operator itself and calls them directly if found. This means that implementation now consists of just two methods in your operator.
Those methods are get_openlineage_facets_on_start()
and get_openlineage_facets_on_complete()
, called when the operator is first scheduled to run and when the operator has finished execution respectively. Either, or both, of the methods may be implemented by the operator.
In the rest of this doc, you will see how to write these methods within an operator class called DfToGcsOperator
. This operator moves a Dataframe from an arbitrary source table using a supplied Python callable to a specified path in GCS. Thorough understanding of the __init__()
and execute()
methods of the operator is not required, but an abbreviated version of each method is given below for context. The final two methods in the class are get_openlineage_facets_on_start()
and get_openlineage_facets_on_complete()
, which we will be implementing piece-by-piece in the rest of the doc. They are provided here in their entirety for completeness.
from openlineage.airflow.extractors.base import OperatorLineage
from openlineage.client.facet import (
DataSourceDatasetFacet,
DocumentationJobFacet,
OwnershipJobFacet,
OwnershipJobFacetOwners,
SchemaDatasetFacet,
SchemaField,
)
from openlineage.client.run import Dataset
class DfToGcsOperator():
def __init__(
self,
task_id,
python_callable,
data_source,
bucket=None,
table=None,
security_group,
pipeline_phase,
col_types=None,
check_cols=True,
**kwargs,
):
"""Initialize a DfToGcsOperator."""
super().__init__(task_id=task_id, **kwargs)
self.python_callable = python_callable
self.data_source = data_source
self.table = table if table is not None else task_id
self.bucket = bucket
self.security_group = security_group
self.pipeline_phase = pipeline_phase
# col_types is a dict that stores expected column names and types,
self.col_types = col_types
self.check_cols = check_cols
self.base_path = "/".join(
[self.security_group, self.pipeline_phase, self.data_source, self.table]
)
# Holds meta information about the dataframe, col names and col types,
# that are used in the extractor.
self.df_meta = None
def execute(self, context):
"""
Run a DfToGcs task.
The task will run the python_callable and save
the resulting dataframe to GCS under the proper object path
<security_group>/<pipeline_phase>/<data_source>/<table>/.
"""
...
df = get_python_callable_result(self.python_callable, context)
if len(df) > 0:
df.columns = [clean_column_name(c) for c in df.columns]
if self.col_types and self.check_cols:
check_cols = [c.lower().strip() for c in self.col_types.keys()]
missing = [m for m in check_cols if m not in df.columns]
assert (
len(missing) == 0
), "Columns present in col_types but not in DataFrame: " + ",".join(
missing
)
# ----------- #
# Save to GCS #
# ----------- #
# Note: this is an imported helper function.
df_to_gcs(df, self.bucket, save_to_path)
# ----------- #
# Return Data #
# ----------- #
# Allow us to extract additional lineage information
# about all of the fields available in the dataframe
self.df_meta = extract_df_fields(df)
else:
print("Empty dataframe, no artifact saved to GCS.")
def extract_df_fields(df):
from openlineage.common.dataset import SchemaField
"""Extract a list of SchemaFields from a DataFrame."""
fields = []
for (col, dtype) in zip(df.columns, df.dtypes):
fields.append(SchemaField(name=col, type=str(dtype)))
return fields
def get_openlineage_facets_on_start(self):
"""Add lineage to DfToGcsOperator on task start."""
if not self.bucket:
ol_bucket = get_env_bucket()
else:
ol_bucket = self.bucket
input_uri = "://".join([self.data_source, self.table])
input_source = DataSourceDatasetFacet(
name=self.table,
uri=input_uri,
)
input_facet = {
"datasource": input_source,
"schema": SchemaDatasetFacet(
fields=[
SchemaField(name=col_name, type=col_type)
for col_name, col_type in self.col_types.items()
]
),
}
input = Dataset(namespace=self.data_source, name=self.table, facets=input_facet)
output_namespace = "gs://" + ol_bucket
output_name = self.base_path
output_uri = "/".join(
[
output_namespace,
output_name,
]
)
output_source = DataSourceDatasetFacet(
name=output_name,
uri=output_uri,
)
output_facet = {
"datasource": output_source,
"schema": SchemaDatasetFacet(
fields=[
SchemaField(name=col_name, type=col_type)
for col_name, col_type in self.col_types.items()
]
),
}
output = Dataset(
namespace=output_namespace,
name=output_name,
facets=output_facet,
)
return OperatorLineage(
inputs=[input],
outputs=[output],
run_facets={},
job_facets={
"documentation": DocumentationJobFacet(
description=f"""
Takes data from the data source {input_uri}
and puts it in GCS at the path: {output_uri}
"""
),
"ownership": OwnershipJobFacet(
owners=[OwnershipJobFacetOwners(name=self.owner, type=self.email)]
),
}
)
def get_openlineage_facets_on_complete(self, task_instance):
"""Add lineage to DfToGcsOperator on task completion."""
starting_facets = self.get_openlineage_facets_on_start()
if task_instance.task.df_meta is not None:
for i in starting_facets.inputs:
i.facets["SchemaDatasetFacet"].fields = task_instance.task.df_meta
else:
starting_facets.run_facets = {
"errorMessage": ErrorMessageRunFacet(
message="Empty dataframe, no artifact saved to GCS.",
programmingLanguage="python"
)
}
return starting_facets
Implementing lineage in an operator
Not surprisingly, you will need an operator class to implement lineage collection in an operator. Here, we’ll use the DfToGcsOperator
, a custom operator created by the Astronomer Data team to load arbitrary dataframes to our GCS bucket. We’ll implement both get_openlineage_facets_on_start()
and get_openlineage_facets_on_complete()
for our custom operator. The specific details of the implementation will vary from operator to operator, but there will always be five basic steps that these functions will share.
Both the methods return an OperatorLineage
object, which itself is a collection of facets. Four of the five steps mentioned above are creating these facets where necessary, and the fifth is creating the DataSourceDatasetFacet
. First, though, we’ll need to import some OpenLineage objects:
from openlineage.airflow.extractors.base import OperatorLineage
from openlineage.client.facet import (
DataSourceDatasetFacet,
SchemaDatasetFacet,
SchemaField,
)
from openlineage.client.run import Dataset
Now, we’ll start building the facets for the OperatorLineage
object in the get_openlineage_facets_on_start()
method.
1. DataSourceDatasetFacet
The DataSourceDatasestFacet
is a simple object, containing two fields, name
and uri
, which should be populated with the unique name of the data source and the URI. We’ll make two of these objects, an input_source
to specify where the data came from and an output_source
to specify where the data is going.
A quick note about the philosophy behind the name
and uri
in the OpenLineage spec: the uri
is built from the namespace
and the name
, and each is expected to be unique with respect to its environment. This means a namespace
should be globally unique in the OpenLineage universe, and the name
unique within the namespace
. The two are then concatenated to form the uri
, so that uri = namespace + name
. The full naming spec can be found here.
In our case, the input name
will be the table we are pulling data from, self.table
, and the namespace
will be our self.data_source
.
input_source = DataSourceDatasetFacet(
name=self.table,
uri="://".join([self.data_source, self.table]),
)
The output data source object’s name
will always be the base path given to the operator, self.base_path
. The namespace
is always in GCS, so we use the OpenLineage spec’s gs://
as the scheme and our bucket as the authority, giving us gs://{ol_bucket}
. The uri
is simply the concatenation of the two.
if not self.bucket:
ol_bucket = get_env_bucket()
else:
ol_bucket = self.bucket
output_namespace = "gs://" + ol_bucket
output_name = self.base_path
output_uri = "/".join(
[
output_namespace,
output_name,
]
)
output_source = DataSourceDatasetFacet(
name=output_name,
uri=output_uri,
)
2. Inputs
Next we’ll create the input dataset object. As we are moving data from a dataframe to GCS in this operator, we’ll make sure that we are capturing all the info in the dataframe being extracted in a Dataset
. To create the Dataset
object, we’ll need namespace
, name
, and facets
objects. The first two are strings, and facets
is a dictionary.
Our namespace
will come from the operator, where we use self.data_source
again. The name
parameter for this facet will be the table, again coming from the operator’s parameter list. The facets
will contain two entries, the first being our DataSourceDatasetFacet
with the key "datasource" coming from the previous step and input_source
being the value. The second has the key "schema", with the value being a SchemaDatasetFacet
, which itself is a collection of SchemaField
objects, one for each column, created via a list comprehension over the operator's self.col_types
parameter.
The inputs
parameter to OperatorLineage
is a list of Dataset
objects, so we’ll end up adding a single Dataset
object to the list later. The creation of the Dataset
object looks like the following:
input_facet = {
"datasource": input_source,
"schema": SchemaDatasetFacet(
fields=[
SchemaField(name=col_name, type=col_type)
for col_name, col_type in self.col_types.items()
]
),
}
input = Dataset(namespace=self.data_source, name=self.table, facets=input_facet)