A technical deep-dive on how the Airflow OSS and OpenLineage OSS projects interact.
Airflow Operators and OpenLineage Extractors have a specific, if quirky, way of working together. Recently, the way they work together has seen a bit of an overhaul, and the new SQL Check Extractors added a new and unique way that the extractors work and interact with operators. In this blog, we'll demystify these relationships.
Note: This blog post describes the relationships of the operators and extractors only for
The Operator and the Extractor
Some quick definitions are in order before we continue.
The Airflow Operator defines a task, which is the unit of work in Airflow. All operators inherit from the
BaseOperator, and in addition to taking the arguments of the
BaseOperator, they can take arguments specific to the kind of task they are going to perform, such as a specific
conn_id to connect to a datasource or a dictionary of checks to perform on that datasource.
The OpenLineage Extractor is somewhat analogous to the Airflow Operator: it is a unit of work in OpenLineage, which takes the relevant input and output data from an operator, creates OpenLineage data facets, and sends those facets to be displayed in Marquez or Datakin. Each extractor maps to a specific set of operators via the
get_operator_classnames() class method. The extractors all inherit from a
BaseExtractor, which defines a few abstract methods, importantly
Briefly, the two other major OpenLineage constructs in this story are the
ExtractorManager, which is responsible for identifying the correct extractor to use, and the
Listener, which is the connecting piece between OpenLineage and Airflow.
Next, we'll walk through what happens when an Airflow instance with OpenLineage support runs a DAG, and how that operator data makes it to the Marquez or Datakin UI.
First, a DAG is born. When the DAG is run, the scheduler runs the operators in order by calling their
execute() method. This is the first time the OpenLineage
Listener responds. Triggered by the
execute() event, it calls the
Manager, which identifies the correct extractor based on the
task_id. Then the
execute() method is run, potentially returning lineage data in the form of a metadata object. When the operator is done--either succeeded or failed--, the
Listener calls the
Manager again, and this time the
Manager triggers the
execute_on_complete() method, which may also return metadata based on the result of the task. The metadata object is then sent to Marquez or Datakin, where the data is displayed.
SQL Check Operators/Extractors
SQLCheckExtractors work slightly differently than the interplay outlined above. The biggest difference is that the
SQLCheckExtractors all inherit from a
BaseSqlCheckExtractor, which in turn dynamically inherits from the appropriate extractor at run time. The appropriate extractor is always some existing SQL database extractor. The
BaseSqlCheckExtractor itself only implements the
extract_on_complete() method by determining whether the super class’s
extract_on_complete() method should be run to gather metadata. The
_get_input_facets() methods are all implemented by the particular check extractors, and are called in the super class’s
The dynamic inheritance is done by defining the
SQLCheckExtractors inside the
get_check_extractors() function that takes a class as a parameter and passes that class to the constructor of the
BaseSqlCheckExtractor. When a
SQLCheckOperator is run, and the
Manager searches for the correct extractor to use, it calls
instantiate_abstract_extractors() with the given task instance.
In this function, the task instance is used to find the correct extractor that will be the superclass of the
BaseSqlCheckOperator. To do this, the function uses a set of hard-coded operator names whose extractors will dynamically inherit from the found superclass. Currently, this list is the set of
SQLCheckOperators, which corresponds to a dictionary of extractor keys and
conn_type values. The given task’s class name is checked against the set of operator names, and if it is in the set, a loop compares the existing extractor’s corresponding
conn_type from the aforementioned dictionary to the given task instance’s
conn_type retrieved from Airflow connections. If there’s a match, the
get_check_extractors() method is called with the matched extractor, instantiating all the operators with the correct superclass.
SQLCheckExtractors only rely on the
extract_on_complete() method, as the values needed from the operators, i.e. the results of the query and the success or failures of the checks, are only available after the operator completes.