Configuration
We recommend configuring the client with an openlineage.yml
file that contains all the
details of how to connect to your OpenLineage backend.
You can make this file available to the client in three ways (the list also presents precedence of the configuration):
- Set an
OPENLINEAGE_CONFIG
environment variable to a file path:OPENLINEAGE_CONFIG=path/to/openlineage.yml
. - Place an
openlineage.yml
in the user's current working directory. - Place an
openlineage.yml
under.openlineage/
in the user's home directory (~/.openlineage/openlineage.yml
).
Environment Variables
The following environment variables are available:
Name | Description | Since |
---|---|---|
OPENLINEAGE_CONFIG | The path to the YAML configuration file. Example: path/to/openlineage.yml | |
OPENLINEAGE_DISABLED | When true , OpenLineage will not emit events. | 0.9.0 |
You can also configure the client with dynamic environment variables.
The OpenLineage client supports configuration through dynamic environment variables.
Configuring OpenLineage Client via Dynamic Environment Variables
These environment variables must begin with OPENLINEAGE__
, followed by sections of the configuration separated by a double underscore __
.
All values in the environment variables are automatically converted to lowercase,
and variable names using snake_case (single underscore) are converted into camelCase within the final configuration.
Key Features
- Prefix Requirement: All environment variables must begin with
OPENLINEAGE__
. - Sections Separation: Configuration sections are separated using double underscores
__
to form the hierarchy. - Lowercase Conversion: Environment variable values are automatically converted to lowercase.
- CamelCase Conversion: Any environment variable name using single underscore
_
will be converted to camelCase in the final configuration. - JSON String Support: You can pass a JSON string at any level of the configuration hierarchy, which will be merged into the final configuration structure.
- Hyphen Restriction: You cannot use
-
in environment variable names. If a name strictly requires a hyphen, use a JSON string as the value of the environment variable. - Precedence Rules:
- Top-level keys have precedence and will not be overwritten by more nested entries.
- For example,
OPENLINEAGE__TRANSPORT='{..}'
will not have its keys overwritten byOPENLINEAGE__TRANSPORT__AUTH__KEY='key'
.
Examples
- Basic Example
- Composite Example
- Precedence Example
- Spark Example
- Namespace Resolvers Example
Setting following environment variables:
OPENLINEAGE__TRANSPORT__TYPE=http
OPENLINEAGE__TRANSPORT__URL=http://localhost:5050
OPENLINEAGE__TRANSPORT__ENDPOINT=/api/v1/lineage
OPENLINEAGE__TRANSPORT__AUTH__TYPE=api_key
OPENLINEAGE__TRANSPORT__AUTH__API_KEY=random_token
OPENLINEAGE__TRANSPORT__COMPRESSION=gzip
is equivalent to passing following YAML configuration:
transport:
type: http
url: http://localhost:5050
endpoint: api/v1/lineage
auth:
type: api_key
apiKey: random_token
compression: gzip
Setting following environment variables:
OPENLINEAGE__TRANSPORT__TYPE=composite
OPENLINEAGE__TRANSPORT__TRANSPORTS__FIRST__TYPE=http
OPENLINEAGE__TRANSPORT__TRANSPORTS__FIRST__URL=http://localhost:5050
OPENLINEAGE__TRANSPORT__TRANSPORTS__FIRST__ENDPOINT=/api/v1/lineage
OPENLINEAGE__TRANSPORT__TRANSPORTS__FIRST__AUTH__TYPE=api_key
OPENLINEAGE__TRANSPORT__TRANSPORTS__FIRST__AUTH__API_KEY=random_token
OPENLINEAGE__TRANSPORT__TRANSPORTS__FIRST__AUTH__COMPRESSION=gzip
OPENLINEAGE__TRANSPORT__TRANSPORTS__SECOND__TYPE=console
is equivalent to passing following YAML configuration:
transport:
type: composite
transports:
first:
type: http
url: http://localhost:5050
endpoint: api/v1/lineage
auth:
type: api_key
apiKey: random_token
compression: gzip
second:
type: console
Setting following environment variables:
OPENLINEAGE__TRANSPORT='{"type":"console"}'
OPENLINEAGE__TRANSPORT__TYPE=http
is equivalent to passing following YAML configuration:
transport:
type: console
Setting following environment variables:
OPENLINEAGE__TRANSPORT__TYPE=kafka
OPENLINEAGE__TRANSPORT__TOPIC_NAME=test
OPENLINEAGE__TRANSPORT__MESSAGE_KEY=explicit-key
OPENLINEAGE__TRANSPORT__PROPERTIES='{"key.serializer": "org.apache.kafka.common.serialization.StringSerializer"}'
is equivalent to passing following YAML configuration:
transport:
type: kafka
topicName: test
messageKey: explicit-key
properties:
key.serializer: org.apache.kafka.common.serialization.StringSerializer
Please note that you can't use environment variables to set Spark properties, as they are not part of the configuration hierarchy. Following environment variable:
OPENLINEAGE__TRANSPORT__PROPERTIES__KEY__SERIALIZER="org.apache.kafka.common.serialization.StringSerializer"
would be equivalent to below YAML structure:
transport:
properties:
key:
serializer: org.apache.kafka.common.serialization.StringSerializer
which is not a valid configuration for Spark.
Setting following environment variables:
OPENLINEAGE__DATASET__NAMESPACE_RESOLVERS__RESOLVED_NAME__TYPE=hostList
OPENLINEAGE__DATASET__NAMESPACE_RESOLVERS__RESOLVED_NAME__HOSTS='["kafka-prod13.company.com", "kafka-prod15.company.com"]'
OPENLINEAGE__DATASET__NAMESPACE_RESOLVERS__RESOLVED_NAME__SCHEMA=kafka
is equivalent to passing following YAML configuration:
dataset:
namespaceResolvers:
resolvedName:
type: hostList
hosts:
- kafka-prod13.company.com
- kafka-prod15.company.com
schema: kafka
Facets Configuration
In YAML configuration file you can also disable facets to filter them out from the OpenLineage event.
YAML Configuration
transport:
type: console
facets:
spark_unknown:
disabled: true
spark:
logicalPlan:
disabled: true
Deprecated syntax
The following syntax is deprecated and soon will be removed:
transport:
type: console
facets:
disabled:
- spark_unknown
- spark.logicalPlan
The rationale behind deprecation is that some of the facets were disabled by default in some integrations. When we added something extra but didn't include the defaults, they were unintentionally enabled.
Transports
Tip: See current list of all supported transports.
HTTP
Allows sending events to HTTP endpoint, using ApacheHTTPClient.
Configuration
type
- string, must be"http"
. Required.url
- string, base url for HTTP requests. Required.endpoint
- string specifying the endpoint to which events are sent, appended tourl
. Optional, default:/api/v1/lineage
.urlParams
- dictionary specifying query parameters send in HTTP requests. Optional.timeoutInMillis
- integer specifying timeout (in milliseconds) value used while connecting to server. Optional, default:5000
.auth
- dictionary specifying authentication options. Optional, by default no authorization is used. If set, requires thetype
property.type
- string specifying the "api_key" or the fully qualified class name of your TokenProvider. Required ifauth
is provided.apiKey
- string setting the Authentication HTTP header as the Bearer. Required iftype
isapi_key
.
headers
- dictionary specifying HTTP request headers. Optional.compression
- string, name of algorithm used by HTTP client to compress request body. Optional, default valuenull
, allowed values:gzip
. Added in v1.13.0.
Behavior
Events are serialized to JSON, and then are send as HTTP POST request with Content-Type: application/json
.
Examples
- Yaml Config
- Spark Config
- Flink Config
- Java Code
Anonymous connection:
transport:
type: http
url: http://localhost:5000
With authorization:
transport:
type: http
url: http://localhost:5000
auth:
type: api_key
api_key: f38d2189-c603-4b46-bdea-e573a3b5a7d5
Full example:
transport:
type: http
url: http://localhost:5000
endpoint: /api/v1/lineage
urlParams:
param0: value0
param1: value1
timeoutInMillis: 5000
auth:
type: api_key
api_key: f38d2189-c603-4b46-bdea-e573a3b5a7d5
headers:
X-Some-Extra-Header: abc
compression: gzip
Anonymous connection:
spark.openlineage.transport.type=http
spark.openlineage.transport.url=http://localhost:5000
With authorization:
spark.openlineage.transport.type=http
spark.openlineage.transport.url=http://localhost:5000
spark.openlineage.transport.auth.type=api_key
spark.openlineage.transport.auth.apiKey=f38d2189-c603-4b46-bdea-e573a3b5a7d5
Full example:
spark.openlineage.transport.type=http
spark.openlineage.transport.url=http://localhost:5000
spark.openlineage.transport.endpoint=/api/v1/lineage
spark.openlineage.transport.urlParams.param0=value0
spark.openlineage.transport.urlParams.param1=value1
spark.openlineage.transport.timeoutInMillis=5000
spark.openlineage.transport.auth.type=api_key
spark.openlineage.transport.auth.apiKey=f38d2189-c603-4b46-bdea-e573a3b5a7d5
spark.openlineage.transport.headers.X-Some-Extra-Header=abc
spark.openlineage.transport.compression=gzip
URL parsing within Spark integration
You can supply http parameters using values in url, the parsed spark.openlineage.*
properties are located in url as follows:
{transport.url}/{transport.endpoint}/namespaces/{namespace}/jobs/{parentJobName}/runs/{parentRunId}?app_name={appName}&api_key={transport.apiKey}&timeout={transport.timeout}&xxx={transport.urlParams.xxx}
example:
http://localhost:5000/api/v1/namespaces/ns_name/jobs/job_name/runs/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx?app_name=app&api_key=abc&timeout=5000&xxx=xxx
Anonymous connection:
spark.openlineage.transport.type=http
spark.openlineage.transport.url=http://localhost:5000
With authorization:
openlineage.transport.type=http
openlineage.transport.url=http://localhost:5000
openlineage.transport.auth.type=api_key
openlineage.transport.auth.apiKey=f38d2189-c603-4b46-bdea-e573a3b5a7d5
Full example:
openlineage.transport.type=http
openlineage.transport.url=http://localhost:5000
openlineage.transport.endpoint=/api/v1/lineage
openlineage.transport.urlParams.param0=value0
openlineage.transport.urlParams.param1=value1
openlineage.transport.timeoutInMillis=5000
openlineage.transport.auth.type=api_key
openlineage.transport.auth.apiKey=f38d2189-c603-4b46-bdea-e573a3b5a7d5
openlineage.transport.headers.X-Some-Extra-Header=abc
openlineage.transport.compression=gzip
Anonymous connection:
import io.openlineage.client.OpenLineageClient;
import io.openlineage.client.transports.HttpConfig;
import io.openlineage.client.transports.HttpTransport;
HttpConfig httpConfig = new HttpConfig();
httpConfig.setUrl("http://localhost:5000");
OpenLineageClient client = OpenLineageClient.builder()
.transport(
new HttpTransport(httpConfig))
.build();
With authorization:
import io.openlineage.client.OpenLineageClient;
import io.openlineage.client.transports.ApiKeyTokenProvider;
import io.openlineage.client.transports.HttpConfig;
import io.openlineage.client.transports.HttpTransport;
ApiKeyTokenProvider apiKeyTokenProvider = new ApiKeyTokenProvider();
apiKeyTokenProvider.setApiKey("f38d2189-c603-4b46-bdea-e573a3b5a7d5");
HttpConfig httpConfig = new HttpConfig();
httpConfig.setUrl("http://localhost:5000");
httpConfig.setAuth(apiKeyTokenProvider);
OpenLineageClient client = OpenLineageClient.builder()
.transport(
new HttpTransport(httpConfig))
.build();
Full example:
import java.util.Map;
import io.openlineage.client.OpenLineageClient;
import io.openlineage.client.transports.ApiKeyTokenProvider;
import io.openlineage.client.transports.HttpConfig;
import io.openlineage.client.transports.HttpTransport;
Map<String, String> queryParams = Map.of(
"param0", "value0",
"param1", "value1"
);
Map<String, String> headers = Map.of(
"X-Some-Extra-Header", "abc"
);
ApiKeyTokenProvider apiKeyTokenProvider = new ApiKeyTokenProvider();
apiKeyTokenProvider.setApiKey("f38d2189-c603-4b46-bdea-e573a3b5a7d5");
HttpConfig httpConfig = new HttpConfig();
httpConfig.setUrl("http://localhost:5000");
httpConfig.setEndpoint("/api/v1/lineage");
httpConfig.setUrlParams(queryParams);
httpConfig.setAuth(apiKeyTokenProvider);
httpConfig.setTimeoutInMillis(headers);
httpConfig.setHeaders(5000);
httpConfig.setCompression(HttpConfig.Compression.GZIP);
OpenLineageClient client = OpenLineageClient.builder()
.transport(
new HttpTransport(httpConfig))
.build();
Kafka
If a transport type is set to kafka
, then the below parameters would be read and used when building KafkaProducer.
This transport requires the artifact org.apache.kafka:kafka-clients:3.1.0
(or compatible) on your classpath.
Configuration
-
type
- string, must be"kafka"
. Required. -
topicName
- string specifying the topic on what events will be sent. Required. -
properties
- a dictionary containing a Kafka producer config as in Kafka producer config. Required. -
localServerId
- deprecated, renamed tomessageKey
since v1.13.0. -
messageKey
- string, key for all Kafka messages produced by transport. Optional, default value described below. Added in v1.13.0.Default values for
messageKey
are:run:{parentJob.namespace}/{parentJob.name}
- for RunEvent with parent facetrun:{job.namespace}/{job.name}
- for RunEventjob:{job.namespace}/{job.name}
- for JobEventdataset:{dataset.namespace}/{dataset.name}
- for DatasetEvent
Behavior
Events are serialized to JSON, and then dispatched to the Kafka topic.
Notes
It is recommended to provide messageKey
if Job hierarchy is used. It can be any string, but it should be the same for all jobs in
hierarchy, like Airflow task -> Spark application -> Spark task runs
.
Examples
- Yaml Config
- Spark Config
- Flink Config
- Java Code
transport:
type: kafka
topicName: openlineage.events
properties:
bootstrap.servers: localhost:9092,another.host:9092
acks: all
retries: 3
key.serializer: org.apache.kafka.common.serialization.StringSerializer
value.serializer: org.apache.kafka.common.serialization.StringSerializer
messageKey: some-value
spark.openlineage.transport.type=kafka
spark.openlineage.transport.topicName=openlineage.events
spark.openlineage.transport.properties.bootstrap.servers=localhost:9092,another.host:9092
spark.openlineage.transport.properties.acks=all
spark.openlineage.transport.properties.retries=3
spark.openlineage.transport.properties.key.serializer=org.apache.kafka.common.serialization.StringSerializer
spark.openlineage.transport.properties.value.serializer=org.apache.kafka.common.serialization.StringSerializer
spark.openlineage.transport.messageKey=some-value
openlineage.transport.type=kafka
openlineage.transport.topicName=openlineage.events
openlineage.transport.properties.bootstrap.servers=localhost:9092,another.host:9092
openlineage.transport.properties.acks=all
openlineage.transport.properties.retries=3
openlineage.transport.properties.key.serializer=org.apache.kafka.common.serialization.StringSerializer
openlineage.transport.properties.value.serializer=org.apache.kafka.common.serialization.StringSerializer
openlineage.transport.messageKey=some-value
import java.util.Properties;
import io.openlineage.client.OpenLineageClient;
import io.openlineage.client.transports.KafkaConfig;
import io.openlineage.client.transports.KafkaTransport;
Properties kafkaProperties = new Properties();
kafkaProperties.setProperty("bootstrap.servers", "localhost:9092,another.host:9092");
kafkaProperties.setProperty("acks", "all");
kafkaProperties.setProperty("retries", "3");
kafkaProperties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProperties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaConfig kafkaConfig = new KafkaConfig();
KafkaConfig.setTopicName("openlineage.events");
KafkaConfig.setProperties(kafkaProperties);
KafkaConfig.setLocalServerId("some-value");
OpenLineageClient client = OpenLineageClient.builder()
.transport(
new KafkaTransport(httpConfig))
.build();
Notes:
It is recommended to provide messageKey
if Job hierarchy is used. It can be any string, but it should be the same for all jobs in
hierarchy, like Airflow task -> Spark application
.
Default values are:
run:{parentJob.namespace}/{parentJob.name}/{parentRun.id}
- for RunEvent with parent facetrun:{job.namespace}/{job.name}/{run.id}
- for RunEventjob:{job.namespace}/{job.name}
- for JobEventdataset:{dataset.namespace}/{dataset.name}
- for DatasetEvent
Kinesis
If a transport type is set to kinesis
, then the below parameters would be read and used when building KinesisProducer.
Also, KinesisTransport depends on you to provide artifact com.amazonaws:amazon-kinesis-producer:0.14.0
or compatible on your classpath.
Configuration
type
- string, must be"kinesis"
. Required.streamName
- the streamName of the Kinesis. Required.region
- the region of the Kinesis. Required.roleArn
- the roleArn which is allowed to read/write to Kinesis stream. Optional.properties
- a dictionary that contains a Kinesis allowed properties. Optional.
Behavior
- Events are serialized to JSON, and then dispatched to the Kinesis stream.
- The partition key is generated as
{jobNamespace}:{jobName}
. - Two constructors are available: one accepting both
KinesisProducer
andKinesisConfig
and another solely acceptingKinesisConfig
.
Examples
- Yaml Config
- Spark Config
- Flink Config
- Java Code
transport:
type: kinesis
streamName: your_kinesis_stream_name
region: your_aws_region
roleArn: arn:aws:iam::account-id:role/role-name
properties:
VerifyCertificate: true
ConnectTimeout: 6000
spark.openlineage.transport.type=kinesis
spark.openlineage.transport.streamName=your_kinesis_stream_name
spark.openlineage.transport.region=your_aws_region
spark.openlineage.transport.roleArn=arn:aws:iam::account-id:role/role-name
spark.openlineage.transport.properties.VerifyCertificate=true
spark.openlineage.transport.properties.ConnectTimeout=6000
openlineage.transport.type=kinesis
openlineage.transport.streamName=your_kinesis_stream_name
openlineage.transport.region=your_aws_region
openlineage.transport.roleArn=arn:aws:iam::account-id:role/role-name
openlineage.transport.properties.VerifyCertificate=true
openlineage.transport.properties.ConnectTimeout=6000
import java.util.Properties;
import io.openlineage.client.OpenLineageClient;
import io.openlineage.client.transports.KinesisConfig;
import io.openlineage.client.transports.KinesisTransport;
Properties kinesisProperties = new Properties();
kinesisProperties.setProperty("property_name_1", "value_1");
kinesisProperties.setProperty("property_name_2", "value_2");
KinesisConfig kinesisConfig = new KinesisConfig();
kinesisConfig.setStreamName("your_kinesis_stream_name");
kinesisConfig.setRegion("your_aws_region");
kinesisConfig.setRoleArn("arn:aws:iam::account-id:role/role-name");
kinesisConfig.setProperties(kinesisProperties);
OpenLineageClient client = OpenLineageClient.builder()
.transport(
new KinesisTransport(httpConfig))
.build();
Console
This straightforward transport emits OpenLineage events directly to the console through a logger. No additional configuration is required.
Behavior
Events are serialized to JSON. Then each event is logged with INFO
level to logger with name ConsoleTransport
.
Notes
Be cautious when using the DEBUG
log level, as it might result in double-logging due to the OpenLineageClient
also logging.
Configuration
type
- string, must be"console"
. Required.
Examples
- Yaml Config
- Spark Config
- Flink Config
- Java Code
transport:
type: console
spark.openlineage.transport.type=console
openlineage.transport.type=console
import java.util.Properties;
import io.openlineage.client.OpenLineageClient;
import io.openlineage.client.transports.ConsoleTransport;
OpenLineageClient client = OpenLineageClient.builder()
.transport(
new ConsoleTransport())
.build();
File
Designed mainly for integration testing, the FileTransport
emits OpenLineage events to a given file.
Configuration
type
- string, must be"file"
. Required.location
- string specifying the path of the file. Required.
Behavior
- If the target file is absent, it's created.
- Events are serialized to JSON, and then appended to a file, separated by newlines.
- Intrinsic newline characters within the event JSON are eliminated to ensure one-line events.
Notes for Yarn/Kubernetes
This transport type is pretty useless on Spark/Flink applications deployed to Yarn or Kubernetes cluster:
- Each executor will write file to a local filesystem of Yarn container/K8s pod. So resulting file will be removed when such container/pod is destroyed.
- Kubernetes persistent volumes are not destroyed after pod removal. But all the executors will write to the same network disk in parallel, producing a broken file.
Examples
- Yaml Config
- Spark Config
- Flink Config
- Java Code
transport:
type: file
location: /path/to/your/file
spark.openlineage.transport.type=file
spark.openlineage.transport.location=/path/to/your/filext
openlineage.transport.type=file
openlineage.transport.location=/path/to/your/file
import java.util.Properties;
import io.openlineage.client.OpenLineageClient;
import io.openlineage.client.transports.FileConfig;
import io.openlineage.client.transports.FileTransport;
FileConfig fileConfig = new FileConfig("/path/to/your/file");
OpenLineageClient client = OpenLineageClient.builder()
.transport(
new FileTransport(fileConfig))
.build();
Composite
The CompositeTransport
is designed to combine multiple transports, allowing event emission to several destinations. This is useful when events need to be sent to multiple targets, such as a logging system and an API endpoint. The events are delivered sequentially - one after another in a defined order.
Configuration
type
- string, must be "composite". Required.transports
- a list or a map of transport configurations. Required.continueOnFailure
- boolean flag, determines if the process should continue even when one of the transports fails. Default isfalse
.
Behavior
- The configured transports will be initialized and used in sequence to emit OpenLineage events.
- If
continueOnFailure
is set tofalse
, a failure in one transport will stop the event emission process, and an exception will be raised. - If
continueOnFailure
istrue
, the failure will be logged, but the remaining transports will still attempt to send the event.
Notes for Multiple Transports
The composite transport can be used with any OpenLineage transport (e.g. HttpTransport
, KafkaTransport
, etc).
Ideal for scenarios where OpenLineage events need to reach multiple destinations for redundancy or different types of processing.
The transports
configuration can be provided in two formats:
- A list of transport configurations, where each transport may optionally include a
name
field. - A map of transport configurations, where the key acts as the name for each transport. The map format is particularly useful for configurations set via environment variables or Java properties, providing a more convenient and flexible setup.
Why are transport names used?
Transport names are not required for basic functionality. Their primary purpose is to enable configuration of composite transports via environment variables, which is only supported when names are defined.
Examples
- Yaml Config (List)
- Yaml Config (Map)
- Spark Config
- Flink Config
- Java Code
transport:
type: composite
continueOnFailure: true
transports:
- type: http
url: http://example.com/api
name: my_http
- type: kafka
topicName: openlineage.events
properties:
bootstrap.servers: localhost:9092,another.host:9092
acks: all
retries: 3
key.serializer: org.apache.kafka.common.serialization.StringSerializer
value.serializer: org.apache.kafka.common.serialization.StringSerializer
messageKey: some-value
continueOnFailure: true
transport:
type: composite
continueOnFailure: true
transports:
my_http:
type: http
url: http://example.com/api
name: my_http
my_kafka:
type: kafka
topicName: openlineage.events
properties:
bootstrap.servers: localhost:9092,another.host:9092
acks: all
retries: 3
key.serializer: org.apache.kafka.common.serialization.StringSerializer
value.serializer: org.apache.kafka.common.serialization.StringSerializer
messageKey: some-value
continueOnFailure: true
spark.openlineage.transport.type=composite
spark.openlineage.transport.continueOnFailure=true
spark.openlineage.transport.transports.my_http.type=http
spark.openlineage.transport.transports.my_http.url=http://example.com/api
spark.openlineage.transport.transports.my_kafka.type=kafka
spark.openlineage.transport.transports.my_kafka.topicName=openlineage.events
spark.openlineage.transport.transports.my_kafka.properties.bootstrap.servers=localhost:9092,another.host:9092
spark.openlineage.transport.transports.my_kafka.properties.acks=all
spark.openlineage.transport.transports.my_kafka.properties.retries=3
spark.openlineage.transport.transports.my_kafka.properties.key.serializer=org.apache.kafka.common.serialization.StringSerializer
spark.openlineage.transport.transports.my_kafka.properties.value.serializer=org.apache.kafka.common.serialization.StringSerializer
openlineage.transport.type=composite
openlineage.transport.continueOnFailure=true
openlineage.transport.transports.my_http.type=http
openlineage.transport.transports.my_http.url=http://example.com/api
openlineage.transport.transports.my_kafka.type=kafka
openlineage.transport.transports.my_kafka.topicName=openlineage.events
openlineage.transport.transports.my_kafka.properties.bootstrap.servers=localhost:9092,another.host:9092
openlineage.transport.transports.my_kafka.properties.acks=all
openlineage.transport.transports.my_kafka.properties.retries=3
openlineage.transport.transports.my_kafka.properties.key.serializer=org.apache.kafka.common.serialization.StringSerializer
openlineage.transport.transports.my_kafka.properties.value.serializer=org.apache.kafka.common.serialization.StringSerializer
import java.util.Arrays;
import io.openlineage.client.OpenLineageClient;
import io.openlineage.client.transports.CompositeConfig;
import io.openlineage.client.transports.HttpConfig;
import io.openlineage.client.transports.HttpTransport;
import io.openlineage.client.transports.KafkaConfig;
import io.openlineage.client.transports.KafkaTransport;
HttpConfig httpConfig = new HttpConfig();
httpConfig.setUrl("http://example.com/api");
KafkaConfig kafkaConfig = new KafkaConfig();
KafkaConfig.setTopicName("openlineage.events");
KafkaConfig.setLocalServerId("some-value");
CompositeConfig compositeConfig = new CompositeConfig(Arrays.asList(
new HttpTransport(httpConfig),
new KafkaTransport(kafkaConfig)
), true);
OpenLineageClient client = OpenLineageClient.builder()
.transport(
new CompositeTransport(compositeConfig))
.build();
Dataplex
To use this transport in your project, you need to include io.openlineage:transports-dataplex
artifact in
your build configuration. This is particularly important for environments like Spark
, where this transport must be on
the classpath for lineage events to be emitted correctly.
Configuration
type
- string, must be"dataplex"
. Required.endpoint
- string, specifies the endpoint to which events are sent, default value isdatalineage.googleapis.com:443
. Optional.projectId
- string, the project quota identifier. If not provided, it is determined based on user credentials. Optional.location
- string, Dataplex location. Optional, default:"us"
.credentialsFile
- string, path to the Service Account credentials JSON file. Optional, if not provided Application Default Credentials are usedmode
- enum that specifies the type of client used for publishing OpenLineage events to Dataplex. Possible values:sync
(synchronous) orasync
(asynchronous). Optional, default:async
.
Behavior
- Events are serialized to JSON, included as part of a
gRPC
request, and then dispatched to theDataplex
endpoint. - Depending on the
mode
chosen, requests are sent using either a synchronous or asynchronous client.
Examples
- Yaml Config
- Spark Config
- Flink Config
- Java Code
transport:
type: dataplex
projectId: your_gcp_project_id
location: us
mode: sync
credentialsFile: path/to/credentials.json
spark.openlineage.transport.type=dataplex
spark.openlineage.transport.projectId=your_gcp_project_id
spark.openlineage.transport.location=us
spark.openlineage.transport.mode=sync
spark.openlineage.transport.credentialsFile=path/to/credentials.json
openlineage.transport.type=dataplex
openlineage.transport.projectId=your_gcp_project_id
openlineage.transport.location=us
openlineage.transport.mode=sync
openlineage.transport.credentialsFile=path/to/credentials.json
import io.openlineage.client.OpenLineageClient;
import io.openlineage.client.transports.dataplex.DataplexConfig;
import io.openlineage.client.transports.dataplex.DataplexTransport;
DataplexConfig dataplexConfig = new DataplexConfig();
dataplexConfig.setProjectId("your_kinesis_stream_name");
dataplexConfig.setLocation("your_aws_region");
dataplexConfig.setMode("sync");
dataplexConfig.setCredentialsFile("path/to/credentials.json");
OpenLineageClient client = OpenLineageClient.builder()
.transport(
new DataplexTransport(dataplexConfig))
.build();
Google Cloud Storage
To use this transport in your project, you need to include io.openlineage:transports-gcs
artifact in
your build configuration. This is particularly important for environments like Spark
, where this transport must be on
the classpath for lineage events to be emitted correctly.
Configuration
type
- string, must be"gcs"
. Required.projectId
- string, the project quota identifier. Required.credentialsFile
- string, path to the Service Account credentials JSON file. Optional, if not provided Application Default Credentials are usedbucketName
- string, the GCS bucket name. RequiredfileNamePrefix
- string, prefix for the event file names. Optional.
Behavior
- Events are serialized to JSON and stored in the specified GCS bucket.
- Each event file is named based on its
eventTime
, converted to epoch milliseconds, with an optional prefix if configured. - Two constructors are available: one accepting both
Storage
andGcsTransportConfig
and another solely acceptingGcsTransportConfig
.
Examples
- Yaml Config
- Spark Config
- Flink Config
- Java Code
transport:
type: gcs
bucketName: my-gcs-bucket
fileNamePrefix: /file/name/prefix/
credentialsFile: path/to/credentials.json
spark.openlineage.transport.type=gcs
spark.openlineage.transport.bucketName=my-gcs-bucket
spark.openlineage.transport.credentialsFile=path/to/credentials.json
spark.openlineage.transport.credentialsFile=file/name/prefix/
openlineage.transport.type=gcs
openlineage.transport.bucketName=my-gcs-bucket
openlineage.transport.credentialsFile=path/to/credentials.json
openlineage.transport.credentialsFile=file/name/prefix/
import io.openlineage.client.OpenLineageClient;
import io.openlineage.client.transports.gcs.GcsTransportConfig;
import io.openlineage.client.transports.dataplex.GcsTransport;
DataplexConfig gcsConfig = new GcsTransportConfig();
gcsConfig.setBucketName("my-bucket-name");
gcsConfig.setFileNamePrefix("/file/name/prefix/");
gcsConfig.setCredentialsFile("path/to/credentials.json");
OpenLineageClient client = OpenLineageClient.builder()
.transport(
new GcsTransport(dataplexConfig))
.build();
S3
To use this transport in your project, you need to include the following dependency in your build configuration. This is
particularly important for environments like Spark
, where this transport must be on the classpath for lineage events
to be emitted correctly.
Maven
<dependency>
<groupId>io.openlineage</groupId>
<artifactId>transports-s3</artifactId>
<version>YOUR_VERSION_HERE</version>
</dependency>
Configuration
type
- string, must be"s3"
. Required.endpoint
- string, the endpoint for S3 compliant service like MinIO, Ceph, etc. OptionalbucketName
- string, the S3 bucket name. RequiredfileNamePrefix
- string, prefix for the event file names. It is separated from the timestamp with underscore. It can include path and file name prefix. Optional.
Credentials
To authenticate, the transport uses the default credentials provider chain. The possible authentication methods include:
- Java system properties
- Environment variables
- Shared credentials config file (by default
~/.aws/config
) - EC2 instance credentials (convenient in EMR and Glue)
- and other
Refer to the documentation for details.
Behavior
- Events are serialized to JSON and stored in the specified S3 bucket.
- Each event file is named based on its
eventTime
, converted to epoch milliseconds, with an optional prefix if configured.
Examples
- Yaml Config
- Spark Config
- Flink Config
- Java Code
transport:
type: s3
endpoint: https://my-minio.example.com
bucketName: events
fileNamePrefix: my/service/events/event
spark.openlineage.transport.type=s3
spark.openlineage.transport.endpoint=https://my-minio.example.com
spark.openlineage.transport.bucketName=events
spark.openlineage.transport.fileNamePrefix=my/service/events/event
openlineage.transport.type=s3
openlineage.transport.endpoint=https://my-minio.example.com
openlineage.transport.bucketName=events
openlineage.transport.fileNamePrefix=my/service/events/event
import io.openlineage.client.OpenLineageClient;
import io.openlineage.client.transports.s3.S3TransportConfig;
import io.openlineage.client.transports.s3.S3Transport;
S3TransportConfig s3Config = new S3TransportConfig();
s3Config.setEndpoint("https://my-minio.example.com");
s3Config.setBucketName("events");
s3Config.setFileNamePrefix("my/service/events/event");
OpenLineageClient client = OpenLineageClient.builder()
.transport(new S3Transport(s3Config))
.build();
Error Handling via Transport
// Connect to http://localhost:5000
OpenLineageClient client = OpenLineageClient.builder()
.transport(
HttpTransport.builder()
.uri("http://localhost:5000")
.apiKey("f38d2189-c603-4b46-bdea-e573a3b5a7d5")
.build())
.registerErrorHandler(new EmitErrorHandler() {
@Override
public void handleError(Throwable throwable) {
// Handle emit error here
}
}).build();
Defining Your Own Transport
OpenLineageClient client = OpenLineageClient.builder()
.transport(
new MyTransport() {
@Override
public void emit(OpenLineage.RunEvent runEvent) {
// Add emit logic here
}
}).build();
Circuit Breakers
This feature is available in OpenLineage versions >= 1.9.0.
To prevent from over-instrumentation OpenLineage integration provides a circuit breaker mechanism that stops OpenLineage from creating, serializing and sending OpenLineage events.
Simple Memory Circuit Breaker
Simple circuit breaker which is working based only on free memory within JVM. Configuration should
contain free memory threshold limit (percentage). Default value is 20%
. The circuit breaker
will close within first call if free memory is low. circuitCheckIntervalInMillis
parameter is used
to configure a frequency circuit breaker is called. Default value is 1000ms
, when no entry in config.
timeoutInSeconds
is optional. If set, OpenLineage code execution is terminated when a timeout
is reached (added in version 1.13).
- Yaml Config
- Spark Config
- Flink Config
circuitBreaker:
type: simpleMemory
memoryThreshold: 20
circuitCheckIntervalInMillis: 1000
timeoutInSeconds: 90
Parameter | Definition | Example |
---|---|---|
spark.openlineage.circuitBreaker.type | Circuit breaker type selected | simpleMemory |
spark.openlineage.circuitBreaker.memoryThreshold | Memory threshold | 20 |
spark.openlineage.circuitBreaker.circuitCheckIntervalInMillis | Frequency of checking circuit breaker | 1000 |
spark.openlineage.circuitBreaker.timeoutInSeconds | Optional timeout for OpenLineage execution (Since version 1.13) | 90 |
Parameter | Definition | Example |
---|---|---|
openlineage.circuitBreaker.type | Circuit breaker type selected | simpleMemory |
openlineage.circuitBreaker.memoryThreshold | Memory threshold | 20 |
openlineage.circuitBreaker.circuitCheckIntervalInMillis | Frequency of checking circuit breaker | 1000 |
spark.openlineage.circuitBreaker.timeoutInSeconds | Optional timeout for OpenLineage execution (Since version 1.13) | 90 |
Java Runtime Circuit Breaker
More complex version of circuit breaker. The amount of free memory can be low as long as
amount of time spent on Garbage Collection is acceptable. JavaRuntimeCircuitBreaker
closes
when free memory drops below threshold and amount of time spent on garbage collection exceeds
given threshold (10%
by default). The circuit breaker is always open when checked for the first time
as GC threshold is computed since the previous circuit breaker call.
circuitCheckIntervalInMillis
parameter is used
to configure a frequency circuit breaker is called.
Default value is 1000ms
, when no entry in config.
timeoutInSeconds
is optional. If set, OpenLineage code execution is terminated when a timeout
is reached (added in version 1.13).
- Yaml Config
- Spark Config
- Flink Config
circuitBreaker:
type: javaRuntime
memoryThreshold: 20
gcCpuThreshold: 10
circuitCheckIntervalInMillis: 1000
timeoutInSeconds: 90
Parameter | Definition | Example |
---|---|---|
spark.openlineage.circuitBreaker.type | Circuit breaker type selected | javaRuntime |
spark.openlineage.circuitBreaker.memoryThreshold | Memory threshold | 20 |
spark.openlineage.circuitBreaker.gcCpuThreshold | Garbage Collection CPU threshold | 10 |
spark.openlineage.circuitBreaker.circuitCheckIntervalInMillis | Frequency of checking circuit breaker | 1000 |
spark.openlineage.circuitBreaker.timeoutInSeconds | Optional timeout for OpenLineage execution (Since version 1.13) | 90 |
Parameter | Definition | Example |
---|---|---|
openlineage.circuitBreaker.type | Circuit breaker type selected | javaRuntime |
openlineage.circuitBreaker.memoryThreshold | Memory threshold | 20 |
openlineage.circuitBreaker.gcCpuThreshold | Garbage Collection CPU threshold | 10 |
openlineage.circuitBreaker.circuitCheckIntervalInMillis | Frequency of checking circuit breaker | 1000 |
spark.openlineage.circuitBreaker.timeoutInSeconds | Optional timeout for OpenLineage execution (Since version 1.13) | 90 |
Custom Circuit Breaker
List of available circuit breakers can be extended with custom one loaded via ServiceLoader
with own implementation of io.openlineage.client.circuitBreaker.CircuitBreakerBuilder
.
Metrics
This feature is available in OpenLineage 1.11 and above
To ease the operational experience of using the OpenLineage integrations, this document details the metrics collected by the Java client and the configuration settings for various metric backends.
Metrics collected by Java Client
The following table outlines the metrics collected by the OpenLineage Java client, which help in monitoring the integration's performance:
Metric | Definition | Type |
---|---|---|
openlineage.emit.start | Number of events the integration started to send | Counter |
openlineage.emit.complete | Number of events the integration completed sending | Counter |
openlineage.emit.time | Time spent on emitting events | Timer |
openlineage.circuitbreaker.engaged | Status of the Circuit Breaker (engaged or not) | Gauge |
Metric Backends
OpenLineage uses Micrometer for metrics collection, similar to how SLF4J operates for logging. Micrometer provides a facade over different metric backends, allowing metrics to be dispatched to various destinations.
Configuring Metric Backends
Below are the available backends and potential configurations using Micrometer's facilities.
StatsD
Full configuration options for StatsD can be found in the Micrometer's StatsDConfig implementation.
- Yaml Config
- Spark Config
- Flink Config
metrics:
type: statsd
flavor: datadog
host: localhost
port: 8125
Parameter | Definition | Example |
---|---|---|
spark.openlineage.metrics.type | Metrics type selected | statsd |
spark.openlineage.metrics.flavor | Flavor of StatsD configuration | datadog |
spark.openlineage.metrics.host | Host that receives StatsD metrics | localhost |
spark.openlineage.metrics.port | Port that receives StatsD metrics | 8125 |
Parameter | Definition | Example |
---|---|---|
openlineage.metrics.type | Metrics type selected | statsd |
openlineage.metrics.flavor | Flavor of StatsD configuration | datadog |
openlineage.metrics.host | Host that receives StatsD metrics | localhost |
openlineage.metrics.port | Port that receives StatsD metrics | 8125 |
Dataset Namespace Resolver
This feature is available in OpenLineage 1.17 and above
Oftentimes host addresses are used to access data and a single dataset can be accessed via different addresses. For example, a Kafka topic can be accessed by a list of kafka bootstrap servers or any server from the list. In general, a problem can be solved by adding mechanism which resolves host addresses into logical identifier understood within the organisation. This applies for all clusters like Kafka or Cassandra which should be identified regardless of current list of hosts they contain. This also applies for JDBC urls where a physical address of database can change over time.
Host List Resolver
Host List Resolver given a list of hosts, replaces host name within the dataset namespace into the resolved value defined.
- Yaml Config
- Spark Config
- Flink Config
dataset:
namespaceResolvers:
resolved-name:
type: hostList
hosts: ['kafka-prod13.company.com', 'kafka-prod15.company.com']
schema: "kafka"
| Parameter | Definition | Example |
------------------- ----------------------------------------------------|---------------|--
| spark.openlineage.dataset.namespaceResolvers.resolved-name.type | Resolver type | hostList |
| spark.openlineage.dataset.namespaceResolvers.resolved-name.hosts | List of hosts | ['kafka-prod13.company.com', 'kafka-prod15.company.com']
|
| spark.openlineage.dataset.namespaceResolvers.resolved-name.schema | Optional schema to be specified. Resolver will be only applied if schema matches the configure one. | kafka
|
| Parameter | Definition | Example |
------------------- -------------------------------------------|---------------|--
| openlineage.dataset.namespaceResolvers.resolved-name.type | Resolver type | hostList |
| openlineage.dataset.namespaceResolvers.resolved-name.hosts | List of hosts | ['kafka-prod13.company.com', 'kafka-prod15.company.com']
|
| openlineage.dataset.namespaceResolvers.resolved-name.schema | Optional schema to be specified. Resolver will be only applied if schema matches the configure one. | kafka
|
Pattern Namespace Resolver
Java regex pattern is used to identify a host. Substrings matching a pattern will be replaced with resolved name.
- Yaml Config
- Spark Config
- Flink Config
dataset:
namespaceResolvers:
resolved-name:
type: pattern
# 'cassandra-prod7.company.com', 'cassandra-prod8.company.com'
regex: 'cassandra-prod(\d)+\.company\.com'
schema: "cassandra"
| Parameter | Definition | Example |
------------------- -------------------------------------------------|---------------|--------
| spark.openlineage.dataset.namespaceResolvers.resolved-name.type | Resolver type | pattern |
| spark.openlineage.dataset.namespaceResolvers.resolved-name.hosts | Regex pattern to find and replace | cassandra-prod(\d)+\.company\.com
|
| spark.openlineage.dataset.namespaceResolvers.resolved-name.schema | Optional schema to be specified. Resolver will be only applied if schema matches the configure one. | kafka
|
| Parameter | Definition | Example |
------------------- -------------------------------------------|---------------|--
| openlineage.dataset.namespaceResolvers.resolved-name.type | Resolver type | pattern |
| openlineage.dataset.namespaceResolvers.resolved-name.hosts | Regex pattern to find and replace | cassandra-prod(\d)+\.company\.com
|
| openlineage.dataset.namespaceResolvers.resolved-name.schema | Optional schema to be specified. Resolver will be only applied if schema matches the configure one. | kafka
|
Pattern Group Namespace Resolver
For this resolver, Java regex pattern is used to identify a host. However, instead of configured resolved name,
a matchingGroup
is used a resolved name. This can be useful when having several clusters
made from hosts with a well-defined host naming convention.
- Yaml Config
- Spark Config
- Flink Config
dataset:
namespaceResolvers:
test-pattern:
type: patternGroup
# 'cassandra-test-7.company.com', 'cassandra-test-8.company.com', 'kafka-test-7.company.com', 'kafka-test-8.company.com'
regex: '(?<cluster>[a-zA-Z-]+)-(\d)+\.company\.com:[\d]*'
matchingGroup: "cluster"
schema: "cassandra"
| Parameter | Definition | Example |
------------------- ----------------------------------------------------|---------------|--
| spark.openlineage.dataset.namespaceResolvers.pattern-group-resolver.type | Resolver type | patternGroup |
| spark.openlineage.dataset.namespaceResolvers.pattern-group-resolver.regex | Regex pattern to find and replace | (?<cluster>[a-zA-Z-]+)-(\d)+\.company\.com:[\d]*
|
| spark.openlineage.dataset.namespaceResolvers.pattern-group-resolver.matchingGroup | Matching group named within the regex | cluster
|
| spark.openlineage.dataset.namespaceResolvers.pattern-group-resolver.schema | Optional schema to be specified. Resolver will be only applied if schema matches the configure one. | kafka
|
| Parameter | Definition | Example |
------------------- ----------------------------------------------------|---------------|--
| openlineage.dataset.namespaceResolvers.pattern-group-resolver.type | Resolver type | patternGroup |
| openlineage.dataset.namespaceResolvers.pattern-group-resolver.regex | Regex pattern to find and replace | (?<cluster>[a-zA-Z-]+)-(\d)+\.company\.com
|
| openlineage.dataset.namespaceResolvers.pattern-group-resolver.matchingGroup | Matching group named within the regex | cluster
|
| openlineage.dataset.namespaceResolvers.pattern-group-resolver.schema | Optional schema to be specified. Resolver will be only applied if schema matches the configure one. | kafka
|
Custom Resolver
Custom resolver can be added by implementing:
io.openlineage.client.dataset.namespaceResolver.DatasetNamespaceResolver
io.openlineage.client.dataset.namespaceResolver.DatasetNamespaceResolverBuilder
io.openlineage.client.dataset.namespaceResolver.DatasetNamespaceResolverConfig
Config class can be used to pass any namespace resolver parameters through standard configuration
mechanism (Spark & Flink configuration or openlineage.yml
file provided). Standard ServiceLoader
approach is used to load and initiate custom classes.