Flink 2.x
Overview
With the release of Apache Flink 2.0, the OpenLineage integration has been updated to utilize the native API for lineage extraction, which was initially proposed in FLIP-314. This new API allows for a more efficient and streamlined approach to lineage extraction, eliminating the need for modifications to the job code. The other advantage of this implementation is that it supports Flink SQL, which was not possible with the previous version.
At the same time, it is the Flink's connectors which contain implementation of sources and sinks, which are responsible for providing methods to extract lineage information. This poses a challenge for the OpenLineage integration, as it requires the connectors to implement the lineage interfaces. Currently, only the Kafka connector supports this functionality.
Usage
To enable OpenLineage integration in Flink 2.x, a job status change listener has to be configured as described in Flink docs.
This can be achieved by including openlineage-flink package on the classpath and providing extra config:
execution.job-status-changed-listeners = io.openlineage.flink.listener.OpenLineageJobStatusChangedListenerFactory
Please refer to configuration section for more details about the configuration options.
Implementation
OpenLineage implements io.openlineage.flink.listener.OpenLineageJobStatusChangedListener which is a subclass
of org.apache.flink.core.execution.JobStatusChangedListener. One of its subclasses is
org.apache.flink.streaming.runtime.execution.JobCreatedEvent which contains a method
that returns LineageGraph object. This object contains all the lineage information about the job.
Additionally, after a job is triggered, OpenLineage integration starts job tracker thread that periodically polls lineage metadata updates from Flink jobs API. Currently, it is used to collect information about the checkpoints processed.
Column Level Lineage
Unfortunately, lineage interfaces in Flink 2.x do not provide column level lineage information. In general, this may be difficult to extract for the transformations defined through the programming language. However, it is possible to extract column level lineage information for Flink SQL jobs.
Following PR contains a potential extension to Flink to make it available. Please refer to this document for more information about the implementation.
Special Detached Job Tracking Flow
How to know if you need this flow
You likely need detached job tracking if either of these is true:
- You're submitting your job in detached mode, with the job client running on a remote machine meaning the client and the JobManager are in different JVMs.
- In the default flow, you're only seeing the
STARTevent and noRUNNING,COMPLETE, orFAILevents afterward, which is a strong signal that your job is being submitted remotely in detached mode.
Enabling it
Enable the flow by setting openlineage.flink.enableDetachedJobTracking to true. See the
configuration section for this option and the related
openlineage.flink.detachedStartEventEmitTimeoutInSeconds timeout.
With this enabled, the START event is emitted from the submitting (client) process, while later status
events (RUNNING, COMPLETE, FAIL) are handled by the JobManager listener.
How it works
When you submit remotely in detached mode, the job client and the JobManager run in different JVMs,
typically on different machines. The default flow assumes both lifecycle stages happen in the same JVM:
the client-side START event carries the Flink job ID, and the JobManager-side listener relies on that
same event to initialize its tracking.
Because the client is remote and lives in a separate JVM, the JobManager-side listener never receives
that job ID. It cannot associate incoming status changes with the existing run, so it does not emit OpenLineage
events for them. This is why you see START event but nothing after it.
Detached job tracking fixes this by having the JobManager-side listener reconstruct the same OpenLineage
run id from the Flink job id and the job start time exposed by the Flink jobs API, so the
post-submission lifecycle events correlate back to the original START.
Limitation
Unlike the default flow, detached job tracking does not emit an immediate RUNNING event right after START
to avoid race conditions. Later RUNNING events will be emitted as normal from checkpoint tracking using the
openlineage.trackingIntervalInSeconds interval.