This integration works by detecting which Airflow operators your DAG is using, and extracting lineage data from them using corresponding extractors.
However, not all operators are covered. In particular, third party providers may not be. To handle this situation, OpenLineage allows you to provide custom extractors for any operators where there is not one built-in.
If you want to extract lineage from your own Operators, you may prefer directly implementing lineage support as described here.
Custom extractors have to derive from
Extractors have three methods to implement:
The last one is a classmethod that is used to provide list of operators that your extractor can get lineage from.
def get_operator_classnames(cls) -> List[str]:
If the name of the operator matches one of the names on the list, the extractor will be instantiated - with operator
provided in the extractor's
self.operator property - and both
extract_on_complete methods will be called.
They are used to provide actual information data. The difference is that
extract is called before operator's
extract_on_complete is called after. This can be used to extract any additional information that the operator
sets on it's own properties. Good example is
SnowflakeOperator that sets
query_ids after execution.
Both methods return
name: str = attr.ib() # deprecated
inputs: List[Dataset] = attr.ib(factory=list)
outputs: List[Dataset] = attr.ib(factory=list)
run_facets: Dict[str, BaseFacet] = attr.ib(factory=dict)
job_facets: Dict[str, BaseFacet] = attr.ib(factory=dict)
Inputs and outputs are lists of plain OpenLineage datasets
To learn more about facets in OpenLineage, please visit this section.
Registering custom extractor
OpenLineage integration does not know that you've provided an extractor unless you'll register it.
The way to do that is to add them to
OPENLINEAGE_EXTRACTORS environment variable.
If you have multiple custom extractors, separate the paths with comma
Optionally, you can separate them with whitespace. It's useful if you're providing them as part of some YAML file.
Remember to make sure that the path is importable for scheduler and worker.
Adding extractor to OpenLineage Airflow integration package
All Openlineage extractors are defined in this path.
In order to add new extractor you should put your code in this directory. Additionally, you need to add the class to
_extractors list in extractors.py, e.g.:
_extractors = list(
lambda t: t is not None,
... # other extractors are listed here
There are two common problems associated with custom extractors.
First, is wrong path provided to
The path needs to be exactly the same as one you'd use from your code. If the path is wrong or non-importable from worker,
plugin will fail to load the extractors and proper OpenLineage events for that operator won't be emitted.
Second one, and maybe more insidious, are imports from Airflow. Due to the fact that OpenLineage code gets instantiated when Airflow worker itself starts, any import from Airflow can be unnoticeably cyclical. This causes OpenLineage extraction to fail.
To avoid this issue, import from Airflow only locally - in
extract_on_complete methods. If you need imports for
type checking, guard them behind
You can also check Development section to learn more about how to setup development environment and create tests.