Custom Extractors
This page is about Airflow's external integration that works mainly for Airflow versions <2.7.
If you're using Airflow 2.7+, look at native Airflow OpenLineage provider documentation.
The ongoing development and enhancements will be focused on the apache-airflow-providers-openlineage
package,
while the openlineage-airflow
will primarily be updated for bug fixes. See all Airflow versions supported by this integration
This integration works by detecting which Airflow operators your DAG is using, and extracting lineage data from them using corresponding extractors.
However, not all operators are covered. In particular, third party providers may not be. To handle this situation, OpenLineage allows you to provide custom extractors for any operators where there is not one built-in.
If you want to extract lineage from your own Operators, you may prefer directly implementing lineage support as described here.
Interface
Custom extractors have to derive from BaseExtractor
.
Extractors have three methods to implement: extract
, extract_on_complete
and get_operator_classnames
.
The last one is a classmethod that is used to provide list of operators that your extractor can get lineage from.
For example:
@classmethod
def get_operator_classnames(cls) -> List[str]:
return ['PostgresOperator']
If the name of the operator matches one of the names on the list, the extractor will be instantiated - with operator
provided in the extractor's self.operator
property - and both extract
and extract_on_complete
methods will be called.
They are used to provide actual information data. The difference is that extract
is called before operator's execute
method, while extract_on_complete
is called after. This can be used to extract any additional information that the operator
sets on it's own properties. Good example is SnowflakeOperator
that sets query_ids
after execution.
Both methods return TaskMetadata
structure:
@attr.s
class TaskMetadata:
name: str = attr.ib() # deprecated
inputs: List[Dataset] = attr.ib(factory=list)
outputs: List[Dataset] = attr.ib(factory=list)
run_facets: Dict[str, BaseFacet] = attr.ib(factory=dict)
job_facets: Dict[str, BaseFacet] = attr.ib(factory=dict)
Inputs and outputs are lists of plain OpenLineage datasets
run_facets
and job_facets
are dictionaries of optional JobFacets and RunFacets that would be attached to the job - for example,
you might want to attach SqlJobFacet
if your operator is executing SQL.
To learn more about facets in OpenLineage, please visit this section.
Registering custom extractor
OpenLineage integration does not know that you've provided an extractor unless you'll register it.
The way to do that is to add them to OPENLINEAGE_EXTRACTORS
environment variable.
OPENLINEAGE_EXTRACTORS=full.path.to.ExtractorClass
If you have multiple custom extractors, separate the paths with comma (;)
OPENLINEAGE_EXTRACTORS=full.path.to.ExtractorClass;full.path.to.AnotherExtractorClass
Optionally, you can separate them with whitespace. It's useful if you're providing them as part of some YAML file.
OPENLINEAGE_EXTRACTORS: >-
full.path.to.FirstExtractor;
full.path.to.SecondExtractor
Remember to make sure that the path is importable for scheduler and worker.
Adding extractor to OpenLineage Airflow integration package
All Openlineage extractors are defined in this path.
In order to add new extractor you should put your code in this directory. Additionally, you need to add the class to _extractors
list in extractors.py, e.g.:
_extractors = list(
filter(
lambda t: t is not None,
[
try_import_from_string(
'openlineage.airflow.extractors.postgres_extractor.PostgresExtractor'
),
... # other extractors are listed here
+ try_import_from_string(
+ 'openlineage.airflow.extractors.new_extractor.ExtractorClass'
+ ),
]
)
)
Debugging issues
There are two common problems associated with custom extractors.
First, is wrong path provided to OPENLINEAGE_EXTRACTORS
.
The path needs to be exactly the same as one you'd use from your code. If the path is wrong or non-importable from worker,
plugin will fail to load the extractors and proper OpenLineage events for that operator won't be emitted.
Second one, and maybe more insidious, are imports from Airflow. Due to the fact that OpenLineage code gets instantiated when Airflow worker itself starts, any import from Airflow can be unnoticeably cyclical. This causes OpenLineage extraction to fail.
To avoid this issue, import from Airflow only locally - in extract
or extract_on_complete
methods. If you need imports for
type checking, guard them behind typing.TYPE_CHECKING
.
You can also check Development section to learn more about how to setup development environment and create tests.