Apache Flink
Apache Flink is one of the most popular stream processing frameworks. Apache Flink jobs run on clusters,
which are composed of two types of nodes: TaskManagers
and JobManagers
. While clusters typically consists of
multiple TaskManagers
, only reason to run multiple JobManagers is high availability. The jobs are submitted
to JobManager
by JobClient
, that compiles user application into dataflow graph which is understandable by JobManager
.
JobManager
then coordinates job execution: it splits the parallel units of a job
to TaskManagers
, manages heartbeats, triggers checkpoints, reacts to failures and much more.
Apache Flink has multiple deployment modes - Session Mode, Application Mode and Per-Job mode. The most popular
are Session Mode and Application Mode. Session Mode consists of a JobManager
managing multiple jobs sharing single
Flink cluster. In this mode, JobClient
is executed on a machine that submits the job to the cluster.
Application Mode is used where cluster is utilized for a single job. In this mode, JobClient
, where the main method runs,
is executed on the JobManager
.
Flink jobs read data from Sources
and write data to Sinks
. In contrast to systems like Apache Spark, Flink jobs can write
data to multiple places - they can have multiple Sinks
.
Getting lineage from Flink
OpenLineage utilizes Flink's JobListener
interface. This interface is used by Flink to notify user of job submission,
successful finish of job, or job failure. Implementations of this interface are executed on JobClient
.
When OpenLineage listener receives information that job was submitted, it extracts Transformations
from job's
ExecutionEnvironment
. The Transformations
represent logical operations in the dataflow graph; they are composed
of both Flink's built-in operators, but also user-provided Sources
, Sinks
and functions. To get the lineage,
OpenLineage integration processes dataflow graph. Currently, OpenLineage is interested only in information contained
in Sources
and Sinks
, as they are the places where Flink interacts with external systems.
After job submission, OpenLineage integration starts actively listening to checkpoints - this gives insight into whether the job runs properly.
Limitations
Currently, OpenLineage's Flink integration is limited to getting information from jobs running in Application Mode.
OpenLineage integration extracts lineage only from following Sources
and Sinks
:
Sources | Sinks |
---|---|
KafkaSource | KafkaSink (1) |
FlinkKafkaConsumer | FlinkKafkaProducer |
IcebergFlinkSource | IcebergFlinkSink |
We expect this list to grow as we add support for more connectors.
(1) KafkaSink supports sinks that write to a single topic as well as multi topic sinks. The
limitation for multi topic sink is that: topics need to have the same schema and implementation
of KafkaRecordSerializationSchema
must extend KafkaTopicsDescriptor
.
Methods isFixedTopics
and getFixedTopics
from KafkaTopicsDescriptor
are used to extract multiple topics
from a sink.
Usage
In your job, you need to set up OpenLineageFlinkJobListener
.
For example:
JobListener listener = OpenLineageFlinkJobListener.builder()
.executionEnvironment(streamExecutionEnvironment)
.build();
streamExecutionEnvironment.registerJobListener(listener);
Also, OpenLineage needs certain parameters to be set in flink-conf.yaml
:
Configuration Key | Description | Expected Value | Default |
---|---|---|---|
execution.attached | This setting needs to be true if OpenLineage is to detect job start and failure | true | false |
OpenLineage jar needs to be present on JobManager
.
When the JobListener
is configured, you need to point the OpenLineage integration where the events should end up.
If you're using Marquez
, simplest way to do that is to set up OPENLINEAGE_URL
environment
variable to Marquez
URL. More advanced settings are in the client documentation..
Configuring Openlineage connector
Flink Openlineage connector utilizes standard Java client for Openlineage and allows all the configuration features present there to be used. The configuration can be passed with:
openlineage.yml
file with a environment propertyOPENLINEAGE_CONFIG
being set and pointing to configuration file. File structure and allowed options are described here.- Standard Flink configuration with the parameters defined below.
Flink Configuration parameters
The following parameters can be specified:
Parameter | Definition | Example |
---|---|---|
openlineage.transport.type | The transport type used for event emit, default type is console | http |
openlineage.facets.disabled | List of facets to disable, enclosed in [] (required from 0.21.x) and separated by ; , default is [spark_unknown;spark.logicalPlan;] (currently must contain ; ) | [some_facet1;some_facet1] |
openlineage.job.owners.<ownership-type> | Specifies ownership of the job. Multiple entries with different types are allowed. Config key name and value are used to create job ownership type and name (available since 1.13). | openlineage.job.owners.team="Some Team" |
Transports
Tip: See current list of all supported transports.
HTTP
Allows sending events to HTTP endpoint, using ApacheHTTPClient.
Configuration
type
- string, must be"http"
. Required.url
- string, base url for HTTP requests. Required.endpoint
- string specifying the endpoint to which events are sent, appended tourl
. Optional, default:/api/v1/lineage
.urlParams
- dictionary specifying query parameters send in HTTP requests. Optional.timeoutInMillis
- integer specifying timeout (in milliseconds) value used while connecting to server. Optional, default:5000
.auth
- dictionary specifying authentication options. Optional, by default no authorization is used. If set, requires thetype
property.type
- string specifying the "api_key" or the fully qualified class name of your TokenProvider. Required ifauth
is provided.apiKey
- string setting the Authentication HTTP header as the Bearer. Required iftype
isapi_key
.
headers
- dictionary specifying HTTP request headers. Optional.compression
- string, name of algorithm used by HTTP client to compress request body. Optional, default valuenull
, allowed values:gzip
. Added in v1.13.0.
Behavior
Events are serialized to JSON, and then are send as HTTP POST request with Content-Type: application/json
.
Examples
- Yaml Config
- Spark Config
- Flink Config
- Java Code
Anonymous connection:
transport:
type: http
url: http://localhost:5000
With authorization:
transport:
type: http
url: http://localhost:5000
auth:
type: api_key
api_key: f38d2189-c603-4b46-bdea-e573a3b5a7d5
Full example:
transport:
type: http
url: http://localhost:5000
endpoint: /api/v1/lineage
urlParams:
param0: value0
param1: value1
timeoutInMillis: 5000
auth:
type: api_key
api_key: f38d2189-c603-4b46-bdea-e573a3b5a7d5
headers:
X-Some-Extra-Header: abc
compression: gzip
Anonymous connection:
spark.openlineage.transport.type=http
spark.openlineage.transport.url=http://localhost:5000
With authorization:
spark.openlineage.transport.type=http
spark.openlineage.transport.url=http://localhost:5000
spark.openlineage.transport.auth.type=api_key
spark.openlineage.transport.auth.apiKey=f38d2189-c603-4b46-bdea-e573a3b5a7d5
Full example:
spark.openlineage.transport.type=http
spark.openlineage.transport.url=http://localhost:5000
spark.openlineage.transport.endpoint=/api/v1/lineage
spark.openlineage.transport.urlParams.param0=value0
spark.openlineage.transport.urlParams.param1=value1
spark.openlineage.transport.timeoutInMillis=5000
spark.openlineage.transport.auth.type=api_key
spark.openlineage.transport.auth.apiKey=f38d2189-c603-4b46-bdea-e573a3b5a7d5
spark.openlineage.transport.headers.X-Some-Extra-Header=abc
spark.openlineage.transport.compression=gzip
URL parsing within Spark integration
You can supply http parameters using values in url, the parsed spark.openlineage.*
properties are located in url as follows:
{transport.url}/{transport.endpoint}/namespaces/{namespace}/jobs/{parentJobName}/runs/{parentRunId}?app_name={appName}&api_key={transport.apiKey}&timeout={transport.timeout}&xxx={transport.urlParams.xxx}
example:
http://localhost:5000/api/v1/namespaces/ns_name/jobs/job_name/runs/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx?app_name=app&api_key=abc&timeout=5000&xxx=xxx
Anonymous connection:
spark.openlineage.transport.type=http
spark.openlineage.transport.url=http://localhost:5000
With authorization:
openlineage.transport.type=http
openlineage.transport.url=http://localhost:5000
openlineage.transport.auth.type=api_key
openlineage.transport.auth.apiKey=f38d2189-c603-4b46-bdea-e573a3b5a7d5
Full example:
openlineage.transport.type=http
openlineage.transport.url=http://localhost:5000
openlineage.transport.endpoint=/api/v1/lineage
openlineage.transport.urlParams.param0=value0
openlineage.transport.urlParams.param1=value1
openlineage.transport.timeoutInMillis=5000
openlineage.transport.auth.type=api_key
openlineage.transport.auth.apiKey=f38d2189-c603-4b46-bdea-e573a3b5a7d5
openlineage.transport.headers.X-Some-Extra-Header=abc
openlineage.transport.compression=gzip
Anonymous connection:
import io.openlineage.client.OpenLineageClient;
import io.openlineage.client.transports.HttpConfig;
import io.openlineage.client.transports.HttpTransport;
HttpConfig httpConfig = new HttpConfig();
httpConfig.setUrl("http://localhost:5000");
OpenLineageClient client = OpenLineageClient.builder()
.transport(
new HttpTransport(httpConfig))
.build();
With authorization:
import io.openlineage.client.OpenLineageClient;
import io.openlineage.client.transports.ApiKeyTokenProvider;
import io.openlineage.client.transports.HttpConfig;
import io.openlineage.client.transports.HttpTransport;
ApiKeyTokenProvider apiKeyTokenProvider = new ApiKeyTokenProvider();
apiKeyTokenProvider.setApiKey("f38d2189-c603-4b46-bdea-e573a3b5a7d5");
HttpConfig httpConfig = new HttpConfig();
httpConfig.setUrl("http://localhost:5000");
httpConfig.setAuth(apiKeyTokenProvider);
OpenLineageClient client = OpenLineageClient.builder()
.transport(
new HttpTransport(httpConfig))
.build();
Full example:
import java.util.Map;
import io.openlineage.client.OpenLineageClient;
import io.openlineage.client.transports.ApiKeyTokenProvider;
import io.openlineage.client.transports.HttpConfig;
import io.openlineage.client.transports.HttpTransport;
Map<String, String> queryParams = Map.of(
"param0", "value0",
"param1", "value1"
);
Map<String, String> headers = Map.of(
"X-Some-Extra-Header", "abc"
);
ApiKeyTokenProvider apiKeyTokenProvider = new ApiKeyTokenProvider();
apiKeyTokenProvider.setApiKey("f38d2189-c603-4b46-bdea-e573a3b5a7d5");
HttpConfig httpConfig = new HttpConfig();
httpConfig.setUrl("http://localhost:5000");
httpConfig.setEndpoint("/api/v1/lineage");
httpConfig.setUrlParams(queryParams);
httpConfig.setAuth(apiKeyTokenProvider);
httpConfig.setTimeoutInMillis(headers);
httpConfig.setHeaders(5000);
httpConfig.setCompression(HttpConfig.Compression.GZIP);
OpenLineageClient client = OpenLineageClient.builder()
.transport(
new HttpTransport(httpConfig))
.build();
Kafka
If a transport type is set to kafka
, then the below parameters would be read and used when building KafkaProducer.
This transport requires the artifact org.apache.kafka:kafka-clients:3.1.0
(or compatible) on your classpath.
Configuration
-
type
- string, must be"kafka"
. Required. -
topicName
- string specifying the topic on what events will be sent. Required. -
properties
- a dictionary containing a Kafka producer config as in Kafka producer config. Required. -
localServerId
- deprecated, renamed tomessageKey
since v1.13.0. -
messageKey
- string, key for all Kafka messages produced by transport. Optional, default value described below. Added in v1.13.0.Default values for
messageKey
are:run:{parentJob.namespace}/{parentJob.name}
- for RunEvent with parent facetrun:{job.namespace}/{job.name}
- for RunEventjob:{job.namespace}/{job.name}
- for JobEventdataset:{dataset.namespace}/{dataset.name}
- for DatasetEvent
Behavior
Events are serialized to JSON, and then dispatched to the Kafka topic.
Notes
It is recommended to provide messageKey
if Job hierarchy is used. It can be any string, but it should be the same for all jobs in
hierarchy, like Airflow task -> Spark application -> Spark task runs
.