OpenLineage integration works by detecting which Airflow operators your dag is using, and extracting lineage data from them using extractors.
However, there are hundreds of operators in Airflow. In addition, many people and teams write their own operators to
automate repeatable work - like using the same code from
So, most of those operators aren't directly supported in OpenLineage out of the box. To handle this situation, OpenLineage allows you to provide custom extractors for any operators.
Custom extractors have to derive from
Extractors have three methods to implement:
The last one is a classmethod that is used to provide list of operators that your extractor can get lineage from.
def get_operator_classnames(cls) -> List[str]:
If the name of the operator matches one of the names on the list, the extractor will be instantiated - with operator
provided in the extractor's
self.operator property - and both
extract_on_complete methods will be called.
They are used to provide actual information data. The difference is that
extract is called before operator's
extract_on_complete is called after. This can be used to extract any additional information that the operator
sets on it's own properties. Good example is
SnowflakeOperator that sets
query_ids after execution.
Both methods return
name: str = attr.ib() # deprecated
inputs: List[Dataset] = attr.ib(factory=list)
outputs: List[Dataset] = attr.ib(factory=list)
run_facets: Dict[str, BaseFacet] = attr.ib(factory=dict)
job_facets: Dict[str, BaseFacet] = attr.ib(factory=dict)
Inputs and outputs are lists of plain OpenLineage datasets
Add spec description that would be used in those links.
Registering custom extractor
OpenLineage integration does not know that you've provided an extractor unless you'll register it.
The way to do that is to add them to
OPENLINEAGE_EXTRACTORS environment variable.
If you have multiple custom extractors, separate the paths with comma
Optionally, you can separate them with whitespace. It's useful if you're providing them as part of some YAML file.
Second method to register extractors - as a workaround for Airflow 1.10.x only - is to register all additional operator-extractor pairings by
lineage_custom_extractors argument in
There are two common problems associated with custom extractors.
First, is wrong path provided to
The path needs to be exactly the same as one you'd use from your code. If the path is wrong, the extractor won't get imported
and OpenLineage events won't be emitted.
Second one, and maybe more insidious, are imports from Airflow. Due to the fact that OpenLineage code gets instantiated when Airflow worker itself starts, any import from Airflow can be unnoticeably cyclical. This causes OpenLineage extraction to fail.
To avoid this issue, import from Airflow only locally - in
extract_on_complete methods. If you need imports for
type checking, guard them behind