This integration is considered experimental: only specific workflows and use cases are supported.
Apache Flink is one of the most popular stream processing frameworks. Apache Flink jobs run on clusters,
which are composed of two types of nodes:
JobManagers. While clusters typically consists of
TaskManagers, only reason to run multiple JobManagers is high availability. The jobs are submitted
JobClient, that compiles user application into dataflow graph which is understandable by
JobManager then coordinates job execution: it splits the parallel units of a job
TaskManagers, manages heartbeats, triggers checkpoints, reacts to failures and much more.
Apache Flink has multiple deployment modes - Session Mode, Application Mode and Per-Job mode. The most popular
are Session Mode and Application Mode. Session Mode consists of a
JobManager managing multiple jobs sharing single
Flink cluster. In this mode,
JobClient is executed on a machine that submits the job to the cluster.
Application Mode is used where cluster is utilized for a single job. In this mode,
JobClient, where the main method runs,
is executed on the
Flink jobs read data from
Sources and write data to
Sinks. In contrast to systems like Apache Spark, Flink jobs can write
data to multiple places - they can have multiple
Getting lineage from Flink
OpenLineage utilizes Flink's
JobListener interface. This interface is used by Flink to notify user of job submission,
successful finish of job, or job failure. Implementations of this interface are executed on
When OpenLineage listener receives information that job was submitted, it extracts
Transformations from job's
Transformations represent logical operations in the dataflow graph; they are composed
of both Flink's build-in operators, but also user-provided
Sinks and functions. To get the lineage,
OpenLineage integration processes dataflow graph. Currently, OpenLineage is interested only in information contained
Sinks, as they are the places where Flink interacts with external systems.
After job submission, OpenLineage integration starts actively listening to checkpoints - this gives insight into whether the job runs properly.
Currently OpenLineage's Flink integration is limited to getting information from jobs running in Application Mode.
OpenLineage integration extracts lineage only from following
We expect this list to grow as we add support for more connectors.
(1) KafkaSink supports sinks that write to a single topic as well as multi topic sinks. The
limitation for multi topic sink is that: topics need to have the same schema and implementation
KafkaRecordSerializationSchema must extend
KafkaTopicsDescriptor are used to extract multiple topics
from a sink.
In your job, you need to set up
JobListener listener = JobListener listener = OpenLineageFlinkJobListener.builder()
Also, OpenLineage needs certain parameters to be set in
|This setting needs to be true if OpenLineage is to detect job start and failure
OpenLineage jar needs to be present on
JobListener is configured, you need to point the OpenLineage integration where the events should end up.
If you're using
Marquez, simplest way to do that is to set up
Marquez URL. More advanced settings are in the client documentation..
Configuring Openlineage connector
Flink Openlineage connector utilizes standard Java client for Openlineage and allows all the configuration features present there to be used. The configuration can be passed with:
openlineage.ymlfile with a environment property
OPENLINEAGE_CONFIGbeing set and pointing to configuration file. File structure and allowed options are described here.
- Standard Flink configuration with the parameters defined below.
Flink Configuration parameters
The following parameters can be specified:
|The transport type used for event emit, default type is
|List of facets to disable, enclosed in
 (required from 0.21.x) and separated by
|Path to resource
|An API key to be used when sending events to the OpenLineage server
|Timeout for sending OpenLineage info in milliseconds
|A URL parameter (replace xyz) and value to be included in requests to the OpenLineage API server
|The hostname of the OpenLineage API server where events should be reported, it can have other properties embeded
|Request headers (replace xyz) and value to be included in requests to the OpenLineage API server
You can supply http parameters using values in url, the parsed
openlineage.* properties are located in url as follows:
openlineage.transport.type is set to
kinesis, then the below parameters would be read and used when building KinesisProducer.
Also, KinesisTransport depends on you to provide artifact
com.amazonaws:amazon-kinesis-producer:0.14.0 or compatible on your classpath.
|Required, the streamName of the Kinesis Stream
|Required, the region of the stream
|Optional, the roleArn which is allowed to read/write to Kinesis stream
|Optional, the [xxx] is property of Kinesis allowd properties
openlineage.transport.type is set to
kafka, then the below parameters would be read and used when building KafkaProducer.
|Required, name of the topic
|Required, id of local server
|Optional, the [xxx] is property of Kafka client
Please note that configuration parameters provided via standard Flink configuration are translated to Openlineage Java client config entries and whenever new configuration feature is added to a Java client, it will be available for Flink users out of the box with no changes required.