The Python Client -- the Foundation of OpenLineage Integrations
The Python client enables users to create custom integrations.
Introduction
Thanks to the OpenLineage community’s active work on integrations, the pursuit of lineage is getting more efficient and effective all the time. And our growing list of partners and adapters makes OpenLineage plenty powerful out of the box. At the same time, the nature of the data engineering field means that lineage capture is an ongoing process – simply put, the work of lineage is never done.
Hence, as lineage capture becomes integral to your pipelines, situations can arise that require new custom integrations. Enter the Python client, one of two built-in clients included in the project (the other being the Java client). The OpenLineage spec is defined using JSON schema, but we have created these clients so that new integrations do not have to reinvent the wheel.
OpenLineage’s Python client enables the creation of lineage metadata events with Python code. The core data structures currently offered by the client include the RunEvent
, RunState
, Run
, Job
, Dataset
, and Transport
classes. These either configure or collect data for the emission of lineage events.
In the history of the project, the client has been useful in helping us avoid unnecessary duplication of code. It is also integral to OpenLineage’s existing integrations, serving as the basis of the Airflow and dbt integrations, for example. It could also act as the foundation of your own custom integration should you need to write one. (Another use case: tests for a new Airflow extractor.)
For this reason, an existing integration can serve as a useful example of how to use the client to write a new integration (and, hopefully, contribute it back to the project!). What follows is an overview of the Python client and the dbt integration, which uses the Python client. You’ll see how the client receives metadata from dbt to make it available to a consumer of OpenLineage such as Microsoft Purview, Amundsen, Astronomer, Egeria or Marquez.
Python Client Data Structures
The core structures of the client organize metadata about the foundational objects of the OpenLineage spec: runs, jobs and datasets.
A dataset
is a class consisting of a name
, namespace
and, optionally, facets
array:
@attr.s
class Dataset:
namespace: str = attr.ib()
name: str - attr.ib()
Facets: Dict = attr.ib(factory=dict)
The same goes for a job
:
@attr.s
class Job:
namespace: str = attr.ib()
name: str - attr.ib()
Facets: Dict = attr.ib(factory=dict)
A RunEvent
sends the time, state, job, run, producer, input and output information needed to construct an OpenLineage job run event:
@attr.s
class RunEvent:
eventType: RunState = attr.ib(validator=attr.validators.in_(RunState))
eventTime: str = attr.ib()
run: Run = attr.ib()
job: Job = attr.ib()
producer: str = attr.ib()
inputs: Optional[List[Dataset]] = attr.ib(factory=list)
outputs: Optional[List[Dataset]] = attr.ib(factory=list)
The OpenLineage-dbt Integration
At a high level, the dbt integration uses the Python client to push metadata to the OpenLineage backend. The metadata it makes available comprises the run lifecycle, including any dataset inputs and outputs accessed during a job run.
In the dbt-ol
script, the integration uses the project’s ParentRunMetadata
and DbtArtifactProcessor
classes, both of which can be found in the OpenLineage common integration, to parse metadata from the dbt manifest
and run_result
in order to produce OpenLineage events:
from openlineage.common.provider.dbt import DbtArtifactProcessor, ParentRunMetadata
#…
if parent_id:
parent_namespace, parent_job_name, parent_run_id = parent_id.split(‘/’)
parent_run_metadata = ParentRunMetadata(
run_id=parent_run_id,
job_name=parent_job_name,
job_namespace=parent_namespace
)
processor = DbtArtifactProcessor(
producer=PRODUCER,
target=target,
job_namespace=job_namespace,
project_dir=project_dir,
profile_name=profile_name,
logger=logger
)
The integration uses a wrapper for dbt runs because start and complete events are not emitted until execution concludes:
dbt_run_event = dbt_run_event_start(
job_name=f“dbt-run-{processor.project[‘name’]}”,
job_namespace=job_namespace,
parent_run_metadata=parent_run_metadata
)
dbt_run_metadata = ParentRunMetadata(
run_id=dbt_run_event.run.runId,
job_name=dbt_run_event.job.name,
job_namespace=dbt_run_event.job.namespace
)
processor.dbt_run_metadata = dbt_run_metadata
After executing dbt, the script parses the metadata using the processor and emits a run event:
events = processor.parse().events()
client.emit(dbt_run_event_end(
run_id=dbt_run_metadata.run_id,
job_namespace=dbt_run_metadata.job_namespace,
job_name=dbt_run_metadata.job_name,
parent_run_metadata=parent_run_metadata
))
logger.info(f"Emitted {len(events) + 2} openlineage events")
Additional Resources
Check out the source code here: https://github.com/OpenLineage/OpenLineage/tree/main/client/python.
Interested in contributing to the project? Read our guide for new contributors: https://github.com/OpenLineage/OpenLineage/blob/main/CONTRIBUTING.md.
Join us on Slack: https://join.slack.com/t/openlineage/shared_invite/zt-2u4oiyz5h-TEmqpP4fVM5eCdOGeIbZvA.
Attend a community meeting: https://bit.ly/OLwikitsc.