Skip to main content
Version: Next

Apache Flink

Apache Flink is one of the most popular stream processing frameworks. Apache Flink jobs run on clusters, which are composed of two types of nodes: TaskManagers and JobManagers. While clusters typically consists of multiple TaskManagers, only reason to run multiple JobManagers is high availability. The jobs are submitted to JobManager by JobClient, that compiles user application into dataflow graph which is understandable by JobManager. JobManager then coordinates job execution: it splits the parallel units of a job to TaskManagers, manages heartbeats, triggers checkpoints, reacts to failures and much more.

Apache Flink has multiple deployment modes - Session Mode, Application Mode and Per-Job mode. The most popular are Session Mode and Application Mode. Session Mode consists of a JobManager managing multiple jobs sharing single Flink cluster. In this mode, JobClient is executed on a machine that submits the job to the cluster.

Application Mode is used where cluster is utilized for a single job. In this mode, JobClient, where the main method runs, is executed on the JobManager.

Flink jobs read data from Sources and write data to Sinks. In contrast to systems like Apache Spark, Flink jobs can write data to multiple places - they can have multiple Sinks.

OpenLineage utilizes Flink's JobListener interface. This interface is used by Flink to notify user of job submission, successful finish of job, or job failure. Implementations of this interface are executed on JobClient.

When OpenLineage listener receives information that job was submitted, it extracts Transformations from job's ExecutionEnvironment. The Transformations represent logical operations in the dataflow graph; they are composed of both Flink's built-in operators, but also user-provided Sources, Sinks and functions. To get the lineage, OpenLineage integration processes dataflow graph. Currently, OpenLineage is interested only in information contained in Sources and Sinks, as they are the places where Flink interacts with external systems.

After job submission, OpenLineage integration starts actively listening to checkpoints - this gives insight into whether the job runs properly.

Limitations

Currently, OpenLineage's Flink integration is limited to getting information from jobs running in Application Mode.

OpenLineage integration extracts lineage only from following Sources and Sinks:

SourcesSinks
KafkaSourceKafkaSink (1)
FlinkKafkaConsumerFlinkKafkaProducer
IcebergFlinkSourceIcebergFlinkSink

We expect this list to grow as we add support for more connectors.

(1) KafkaSink supports sinks that write to a single topic as well as multi topic sinks. The limitation for multi topic sink is that: topics need to have the same schema and implementation of KafkaRecordSerializationSchema must extend KafkaTopicsDescriptor. Methods isFixedTopics and getFixedTopics from KafkaTopicsDescriptor are used to extract multiple topics from a sink.

Usage

In your job, you need to set up OpenLineageFlinkJobListener.

For example:

JobListener listener = OpenLineageFlinkJobListener.builder()
.executionEnvironment(streamExecutionEnvironment)
.build();
streamExecutionEnvironment.registerJobListener(listener);

Also, OpenLineage needs certain parameters to be set in flink-conf.yaml:

Configuration KeyDescriptionExpected ValueDefault
execution.attachedThis setting needs to be true if OpenLineage is to detect job start and failuretruefalse

OpenLineage jar needs to be present on JobManager.

When the JobListener is configured, you need to point the OpenLineage integration where the events should end up. If you're using Marquez, simplest way to do that is to set up OPENLINEAGE_URL environment variable to Marquez URL. More advanced settings are in the client documentation..

Configuring Openlineage connector

Flink Openlineage connector utilizes standard Java client for Openlineage and allows all the configuration features present there to be used. The configuration can be passed with:

  • openlineage.yml file with a environment property OPENLINEAGE_CONFIG being set and pointing to configuration file. File structure and allowed options are described here.
  • Standard Flink configuration with the parameters defined below.

The following parameters can be specified:

ParameterDefinitionExample
openlineage.transport.typeThe transport type used for event emit, default type is consolehttp
openlineage.facets.disabledList of facets to disable, enclosed in [] (required from 0.21.x) and separated by ;, default is [spark_unknown;spark.logicalPlan;] (currently must contain ;)[some_facet1;some_facet1]
openlineage.job.owners.<ownership-type>Specifies ownership of the job. Multiple entries with different types are allowed. Config key name and value are used to create job ownership type and name (available since 1.13).openlineage.job.owners.team="Some Team"

Transports

Tip: See current list of all supported transports.

HTTP

Allows sending events to HTTP endpoint, using ApacheHTTPClient.

Configuration

  • type - string, must be "http". Required.
  • url - string, base url for HTTP requests. Required.
  • endpoint - string specifying the endpoint to which events are sent, appended to url. Optional, default: /api/v1/lineage.
  • urlParams - dictionary specifying query parameters send in HTTP requests. Optional.
  • timeoutInMillis - integer specifying timeout (in milliseconds) value used while connecting to server. Optional, default: 5000.
  • auth - dictionary specifying authentication options. Optional, by default no authorization is used. If set, requires the type property.
    • type - string specifying the "api_key" or the fully qualified class name of your TokenProvider. Required if auth is provided.
    • apiKey - string setting the Authentication HTTP header as the Bearer. Required if type is api_key.
  • headers - dictionary specifying HTTP request headers. Optional.
  • compression - string, name of algorithm used by HTTP client to compress request body. Optional, default value null, allowed values: gzip. Added in v1.13.0.

Behavior

Events are serialized to JSON, and then are send as HTTP POST request with Content-Type: application/json.

Examples

Anonymous connection:

transport:
type: http
url: http://localhost:5000

With authorization:

transport:
type: http
url: http://localhost:5000
auth:
type: api_key
api_key: f38d2189-c603-4b46-bdea-e573a3b5a7d5

Full example:

transport:
type: http
url: http://localhost:5000
endpoint: /api/v1/lineage
urlParams:
param0: value0
param1: value1
timeoutInMillis: 5000
auth:
type: api_key
api_key: f38d2189-c603-4b46-bdea-e573a3b5a7d5
headers:
X-Some-Extra-Header: abc
compression: gzip

Kafka

If a transport type is set to kafka, then the below parameters would be read and used when building KafkaProducer. This transport requires the artifact org.apache.kafka:kafka-clients:3.1.0 (or compatible) on your classpath.

Configuration

  • type - string, must be "kafka". Required.

  • topicName - string specifying the topic on what events will be sent. Required.

  • properties - a dictionary containing a Kafka producer config as in Kafka producer config. Required.

  • localServerId - deprecated, renamed to messageKey since v1.13.0.

  • messageKey - string, key for all Kafka messages produced by transport. Optional, default value described below. Added in v1.13.0.

    Default values for messageKey are:

    • run:{parentJob.namespace}/{parentJob.name} - for RunEvent with parent facet
    • run:{job.namespace}/{job.name} - for RunEvent
    • job:{job.namespace}/{job.name} - for JobEvent
    • dataset:{dataset.namespace}/{dataset.name} - for DatasetEvent

Behavior

Events are serialized to JSON, and then dispatched to the Kafka topic.

Notes

It is recommended to provide messageKey if Job hierarchy is used. It can be any string, but it should be the same for all jobs in hierarchy, like Airflow task -> Spark application -> Spark task runs.

Examples

transport:
type: kafka
topicName: openlineage.events
properties:
bootstrap.servers: localhost:9092,another.host:9092
acks: all
retries: 3
key.serializer: org.apache.kafka.common.serialization.StringSerializer
value.serializer: org.apache.kafka.common.serialization.StringSerializer
messageKey: some-value

Notes: It is recommended to provide messageKey if Job hierarchy is used. It can be any string, but it should be the same for all jobs in hierarchy, like Airflow task -> Spark application.

Default values are:

  • run:{parentJob.namespace}/{parentJob.name}/{parentRun.id} - for RunEvent with parent facet
  • run:{job.namespace}/{job.name}/{run.id} - for RunEvent
  • job:{job.namespace}/{job.name} - for JobEvent
  • dataset:{dataset.namespace}/{dataset.name} - for DatasetEvent

Console

This straightforward transport emits OpenLineage events directly to the console through a logger. No additional configuration is required.

Behavior

Events are serialized to JSON. Then each event is logged with INFO level to logger with name ConsoleTransport.

Notes

Be cautious when using the DEBUG log level, as it might result in double-logging due to the OpenLineageClient also logging.

Configuration

  • type - string, must be "console". Required.

Examples

transport:
type: console

File

Designed mainly for integration testing, the FileTransport emits OpenLineage events to a given file.

Configuration

  • type - string, must be "file". Required.
  • location - string specifying the path of the file. Required.

Behavior

  • If the target file is absent, it's created.
  • Events are serialized to JSON, and then appended to a file, separated by newlines.
  • Intrinsic newline characters within the event JSON are eliminated to ensure one-line events.

Notes for Yarn/Kubernetes

This transport type is pretty useless on Spark/Flink applications deployed to Yarn or Kubernetes cluster:

  • Each executor will write file to a local filesystem of Yarn container/K8s pod. So resulting file will be removed when such container/pod is destroyed.
  • Kubernetes persistent volumes are not destroyed after pod removal. But all the executors will write to the same network disk in parallel, producing a broken file.

Examples

transport:
type: file
location: /path/to/your/file

Composite

The CompositeTransport is designed to combine multiple transports, allowing event emission to several destinations. This is useful when events need to be sent to multiple targets, such as a logging system and an API endpoint. The events are delivered sequentially - one after another in a defined order.

Configuration

  • type - string, must be "composite". Required.
  • transports - a list or a map of transport configurations. Required.
  • continueOnFailure - boolean flag, determines if the process should continue even when one of the transports fails. Default is false.

Behavior

  • The configured transports will be initialized and used in sequence to emit OpenLineage events.
  • If continueOnFailure is set to false, a failure in one transport will stop the event emission process, and an exception will be raised.
  • If continueOnFailure is true, the failure will be logged, but the remaining transports will still attempt to send the event.

Notes for Multiple Transports

The composite transport can be used with any OpenLineage transport (e.g. HttpTransport, KafkaTransport, etc). Ideal for scenarios where OpenLineage events need to reach multiple destinations for redundancy or different types of processing.

The transports configuration can be provided in two formats:

  1. A list of transport configurations, where each transport may optionally include a name field.
  2. A map of transport configurations, where the key acts as the name for each transport. The map format is particularly useful for configurations set via environment variables or Java properties, providing a more convenient and flexible setup.
Why are transport names used?

Transport names are not required for basic functionality. Their primary purpose is to enable configuration of composite transports via environment variables, which is only supported when names are defined.

Examples

transport:
type: composite
continueOnFailure: true
transports:
- type: http
url: http://example.com/api
name: my_http
- type: kafka
topicName: openlineage.events
properties:
bootstrap.servers: localhost:9092,another.host:9092
acks: all
retries: 3
key.serializer: org.apache.kafka.common.serialization.StringSerializer
value.serializer: org.apache.kafka.common.serialization.StringSerializer
messageKey: some-value
continueOnFailure: true

GcpLineage

To use this transport in your project, you need to include io.openlineage:transports-gcplineage artifact in your build configuration. This is particularly important for environments like Spark, where this transport must be on the classpath for lineage events to be emitted correctly.

Configuration

  • type - string, must be "gcplineage". Required.
  • endpoint - string, specifies the endpoint to which events are sent, default value is datalineage.googleapis.com:443. Optional.
  • projectId - string, the project quota identifier. If not provided, it is determined based on user credentials. Optional.
  • location - string, Dataplex location. Optional, default: "us".
  • credentialsFile - string, path to the Service Account credentials JSON file. Optional, if not provided Application Default Credentials are used
  • mode - enum that specifies the type of client used for publishing OpenLineage events to GCP Lineage service. Possible values: sync (synchronous) or async (asynchronous). Optional, default: async.

Behavior

  • Events are serialized to JSON, included as part of a gRPC request, and then dispatched to the GCP Lineage service endpoint.
  • Depending on the mode chosen, requests are sent using either a synchronous or asynchronous client.

Examples

transport:
type: gcplineage
projectId: your_gcp_project_id
location: us
mode: sync
credentialsFile: path/to/credentials.json

Google Cloud Storage

To use this transport in your project, you need to include io.openlineage:transports-gcs artifact in your build configuration. This is particularly important for environments like Spark, where this transport must be on the classpath for lineage events to be emitted correctly.

Configuration

  • type - string, must be "gcs". Required.
  • projectId - string, the project quota identifier. Required.
  • credentialsFile - string, path to the Service Account credentials JSON file. Optional, if not provided Application Default Credentials are used
  • bucketName - string, the GCS bucket name. Required
  • fileNamePrefix - string, prefix for the event file names. Optional.

Behavior

  • Events are serialized to JSON and stored in the specified GCS bucket.
  • Each event file is named based on its eventTime, converted to epoch milliseconds, with an optional prefix if configured.
  • Two constructors are available: one accepting both Storage and GcsTransportConfig and another solely accepting GcsTransportConfig.

Examples

transport:
type: gcs
bucketName: my-gcs-bucket
fileNamePrefix: /file/name/prefix/
credentialsFile: path/to/credentials.json

S3

To use this transport in your project, you need to include the following dependency in your build configuration. This is particularly important for environments like Spark, where this transport must be on the classpath for lineage events to be emitted correctly.

Maven


<dependency>
<groupId>io.openlineage</groupId>
<artifactId>transports-s3</artifactId>
<version>1.25.0</version>
</dependency>

Configuration

  • type - string, must be "s3". Required.
  • endpoint - string, the endpoint for S3 compliant service like MinIO, Ceph, etc. Optional
  • bucketName - string, the S3 bucket name. Required
  • fileNamePrefix - string, prefix for the event file names. It is separated from the timestamp with underscore. It can include path and file name prefix. Optional.
Credentials

To authenticate, the transport uses the default credentials provider chain. The possible authentication methods include:

  • Java system properties
  • Environment variables
  • Shared credentials config file (by default ~/.aws/config)
  • EC2 instance credentials (convenient in EMR and Glue)
  • and other

Refer to the documentation for details.

Behavior

  • Events are serialized to JSON and stored in the specified S3 bucket.
  • Each event file is named based on its eventTime, converted to epoch milliseconds, with an optional prefix if configured.

Examples

transport:
type: s3
endpoint: https://my-minio.example.com
bucketName: events
fileNamePrefix: my/service/events/event

Circuit Breakers

info

This feature is available in OpenLineage versions >= 1.9.0.

To prevent from over-instrumentation OpenLineage integration provides a circuit breaker mechanism that stops OpenLineage from creating, serializing and sending OpenLineage events.

Simple Memory Circuit Breaker

Simple circuit breaker which is working based only on free memory within JVM. Configuration should contain free memory threshold limit (percentage). Default value is 20%. The circuit breaker will close within first call if free memory is low. circuitCheckIntervalInMillis parameter is used to configure a frequency circuit breaker is called. Default value is 1000ms, when no entry in config. timeoutInSeconds is optional. If set, OpenLineage code execution is terminated when a timeout is reached (added in version 1.13).

circuitBreaker:
type: simpleMemory
memoryThreshold: 20
circuitCheckIntervalInMillis: 1000
timeoutInSeconds: 90

Java Runtime Circuit Breaker

More complex version of circuit breaker. The amount of free memory can be low as long as amount of time spent on Garbage Collection is acceptable. JavaRuntimeCircuitBreaker closes when free memory drops below threshold and amount of time spent on garbage collection exceeds given threshold (10% by default). The circuit breaker is always open when checked for the first time as GC threshold is computed since the previous circuit breaker call. circuitCheckIntervalInMillis parameter is used to configure a frequency circuit breaker is called. Default value is 1000ms, when no entry in config. timeoutInSeconds is optional. If set, OpenLineage code execution is terminated when a timeout is reached (added in version 1.13).

circuitBreaker:
type: javaRuntime
memoryThreshold: 20
gcCpuThreshold: 10
circuitCheckIntervalInMillis: 1000
timeoutInSeconds: 90

Custom Circuit Breaker

List of available circuit breakers can be extended with custom one loaded via ServiceLoader with own implementation of io.openlineage.client.circuitBreaker.CircuitBreakerBuilder.