Skip to main content
Version: Next

Configuration

We recommend configuring the client with an openlineage.yml file that contains all the details of how to connect to your OpenLineage backend.

See example configurations.

You can make this file available to the client in three ways (the list also presents precedence of the configuration):

  1. Set an OPENLINEAGE_CONFIG environment variable to a file path: OPENLINEAGE_CONFIG=path/to/openlineage.yml.
  2. Place an openlineage.yml in the user's current working directory.
  3. Place an openlineage.yml under .openlineage/ in the user's home directory (~/.openlineage/openlineage.yml).

Environment Variables

The following environment variables are available:

NameDescriptionSince
OPENLINEAGE_CONFIGThe path to the YAML configuration file. Example: path/to/openlineage.yml
OPENLINEAGE_DISABLEDWhen true, OpenLineage will not emit events.0.9.0

You can also configure the client with dynamic environment variables.

The OpenLineage client supports configuration through dynamic environment variables.

Configuring OpenLineage Client via Dynamic Environment Variables

These environment variables must begin with OPENLINEAGE__, followed by sections of the configuration separated by a double underscore __. All values in the environment variables are automatically converted to lowercase, and variable names using snake_case (single underscore) are converted into camelCase within the final configuration.

Key Features

  1. Prefix Requirement: All environment variables must begin with OPENLINEAGE__.
  2. Sections Separation: Configuration sections are separated using double underscores __ to form the hierarchy.
  3. Lowercase Conversion: Environment variable values are automatically converted to lowercase.
  4. CamelCase Conversion: Any environment variable name using single underscore _ will be converted to camelCase in the final configuration.
  5. JSON String Support: You can pass a JSON string at any level of the configuration hierarchy, which will be merged into the final configuration structure.
  6. Hyphen Restriction: You cannot use - in environment variable names. If a name strictly requires a hyphen, use a JSON string as the value of the environment variable.
  7. Precedence Rules:
  • Top-level keys have precedence and will not be overwritten by more nested entries.
  • For example, OPENLINEAGE__TRANSPORT='{..}' will not have its keys overwritten by OPENLINEAGE__TRANSPORT__AUTH__KEY='key'.

Examples

Setting following environment variables:

OPENLINEAGE__TRANSPORT__TYPE=http
OPENLINEAGE__TRANSPORT__URL=http://localhost:5050
OPENLINEAGE__TRANSPORT__ENDPOINT=/api/v1/lineage
OPENLINEAGE__TRANSPORT__AUTH__TYPE=api_key
OPENLINEAGE__TRANSPORT__AUTH__API_KEY=random_token
OPENLINEAGE__TRANSPORT__COMPRESSION=gzip

is equivalent to passing following YAML configuration:

transport:
type: http
url: http://localhost:5050
endpoint: api/v1/lineage
auth:
type: api_key
apiKey: random_token
compression: gzip

Facets Configuration

In YAML configuration file you can also disable facets to filter them out from the OpenLineage event.

YAML Configuration

transport:
type: console
facets:
spark_unknown:
disabled: true
spark:
logicalPlan:
disabled: true

Deprecated syntax

The following syntax is deprecated and soon will be removed:

transport:
type: console
facets:
disabled:
- spark_unknown
- spark.logicalPlan

The rationale behind deprecation is that some of the facets were disabled by default in some integrations. When we added something extra but didn't include the defaults, they were unintentionally enabled.

Transports

Tip: See current list of all supported transports.

HTTP

Allows sending events to HTTP endpoint, using ApacheHTTPClient.

Configuration

  • type - string, must be "http". Required.
  • url - string, base url for HTTP requests. Required.
  • endpoint - string specifying the endpoint to which events are sent, appended to url. Optional, default: /api/v1/lineage.
  • urlParams - dictionary specifying query parameters send in HTTP requests. Optional.
  • timeoutInMillis - integer specifying timeout (in milliseconds) value used while connecting to server. Optional, default: 5000.
  • auth - dictionary specifying authentication options. Optional, by default no authorization is used. If set, requires the type property.
    • type - string specifying the "api_key" or the fully qualified class name of your TokenProvider. Required if auth is provided.
    • apiKey - string setting the Authentication HTTP header as the Bearer. Required if type is api_key.
  • headers - dictionary specifying HTTP request headers. Optional.
  • compression - string, name of algorithm used by HTTP client to compress request body. Optional, default value null, allowed values: gzip. Added in v1.13.0.

Behavior

Events are serialized to JSON, and then are send as HTTP POST request with Content-Type: application/json.

Examples

Anonymous connection:

transport:
type: http
url: http://localhost:5000

With authorization:

transport:
type: http
url: http://localhost:5000
auth:
type: api_key
api_key: f38d2189-c603-4b46-bdea-e573a3b5a7d5

Full example:

transport:
type: http
url: http://localhost:5000
endpoint: /api/v1/lineage
urlParams:
param0: value0
param1: value1
timeoutInMillis: 5000
auth:
type: api_key
api_key: f38d2189-c603-4b46-bdea-e573a3b5a7d5
headers:
X-Some-Extra-Header: abc
compression: gzip

Kafka

If a transport type is set to kafka, then the below parameters would be read and used when building KafkaProducer. This transport requires the artifact org.apache.kafka:kafka-clients:3.1.0 (or compatible) on your classpath.

Configuration

  • type - string, must be "kafka". Required.

  • topicName - string specifying the topic on what events will be sent. Required.

  • properties - a dictionary containing a Kafka producer config as in Kafka producer config. Required.

  • localServerId - deprecated, renamed to messageKey since v1.13.0.

  • messageKey - string, key for all Kafka messages produced by transport. Optional, default value described below. Added in v1.13.0.

    Default values for messageKey are:

    • run:{parentJob.namespace}/{parentJob.name} - for RunEvent with parent facet
    • run:{job.namespace}/{job.name} - for RunEvent
    • job:{job.namespace}/{job.name} - for JobEvent
    • dataset:{dataset.namespace}/{dataset.name} - for DatasetEvent

Behavior

Events are serialized to JSON, and then dispatched to the Kafka topic.

Notes

It is recommended to provide messageKey if Job hierarchy is used. It can be any string, but it should be the same for all jobs in hierarchy, like Airflow task -> Spark application -> Spark task runs.

Examples

transport:
type: kafka
topicName: openlineage.events
properties:
bootstrap.servers: localhost:9092,another.host:9092
acks: all
retries: 3
key.serializer: org.apache.kafka.common.serialization.StringSerializer
value.serializer: org.apache.kafka.common.serialization.StringSerializer
messageKey: some-value

Notes: It is recommended to provide messageKey if Job hierarchy is used. It can be any string, but it should be the same for all jobs in hierarchy, like Airflow task -> Spark application.

Default values are:

  • run:{parentJob.namespace}/{parentJob.name}/{parentRun.id} - for RunEvent with parent facet
  • run:{job.namespace}/{job.name}/{run.id} - for RunEvent
  • job:{job.namespace}/{job.name} - for JobEvent
  • dataset:{dataset.namespace}/{dataset.name} - for DatasetEvent

Kinesis

If a transport type is set to kinesis, then the below parameters would be read and used when building KinesisProducer. Also, KinesisTransport depends on you to provide artifact com.amazonaws:amazon-kinesis-producer:0.14.0 or compatible on your classpath.

Configuration

  • type - string, must be "kinesis". Required.
  • streamName - the streamName of the Kinesis. Required.
  • region - the region of the Kinesis. Required.
  • roleArn - the roleArn which is allowed to read/write to Kinesis stream. Optional.
  • properties - a dictionary that contains a Kinesis allowed properties. Optional.

Behavior

  • Events are serialized to JSON, and then dispatched to the Kinesis stream.
  • The partition key is generated as {jobNamespace}:{jobName}.
  • Two constructors are available: one accepting both KinesisProducer and KinesisConfig and another solely accepting KinesisConfig.

Examples

transport:
type: kinesis
streamName: your_kinesis_stream_name
region: your_aws_region
roleArn: arn:aws:iam::account-id:role/role-name
properties:
VerifyCertificate: true
ConnectTimeout: 6000

Console

This straightforward transport emits OpenLineage events directly to the console through a logger. No additional configuration is required.

Behavior

Events are serialized to JSON. Then each event is logged with INFO level to logger with name ConsoleTransport.

Notes

Be cautious when using the DEBUG log level, as it might result in double-logging due to the OpenLineageClient also logging.

Configuration

  • type - string, must be "console". Required.

Examples

transport:
type: console

File

Designed mainly for integration testing, the FileTransport emits OpenLineage events to a given file.

Configuration

  • type - string, must be "file". Required.
  • location - string specifying the path of the file. Required.

Behavior

  • If the target file is absent, it's created.
  • Events are serialized to JSON, and then appended to a file, separated by newlines.
  • Intrinsic newline characters within the event JSON are eliminated to ensure one-line events.

Notes for Yarn/Kubernetes

This transport type is pretty useless on Spark/Flink applications deployed to Yarn or Kubernetes cluster:

  • Each executor will write file to a local filesystem of Yarn container/K8s pod. So resulting file will be removed when such container/pod is destroyed.
  • Kubernetes persistent volumes are not destroyed after pod removal. But all the executors will write to the same network disk in parallel, producing a broken file.

Examples

transport:
type: file
location: /path/to/your/file

Composite

The CompositeTransport is designed to aggregate multiple transports, allowing event emission to several destinations sequentially. This is useful when events need to be sent to multiple targets, such as a logging system and an API endpoint, one after another in a defined order.

Configuration

  • type - string, must be "composite". Required.
  • transports - may be a list or a map of transport configurations. Required.
  • continueOnFailure - boolean flag, determines if the process should continue even when one of the transports fails. Default is false.

Behavior

  • The configured transports will be initialized and used in sequence to emit OpenLineage events.
  • If continueOnFailure is set to false, a failure in one transport will stop the event emission process, and an exception will be raised.
  • If continueOnFailure is true, the failure will be logged, but the remaining transports will still attempt to send the event.

Notes for Multiple Transports

This transport can include a variety of other transport types (e.g., HttpTransport, KafkaTransport, etc.), allowing flexibility in event distribution. Ideal for scenarios where OpenLineage events need to reach multiple destinations for redundancy or different types of processing.

The transports configuration can be provided in two formats:

  1. A list of transport configurations, where each transport may optionally include a name field.
  2. A map of transport configurations, where the key acts as the name for each transport. The map format is particularly useful for configurations set via environment variables or Java properties, providing a more convenient and flexible setup.

Examples

transport:
type: composite
continueOnFailure: true
transports:
- type: http
url: http://example.com/api
name: my_http
- type: kafka
topicName: openlineage.events
properties:
bootstrap.servers: localhost:9092,another.host:9092
acks: all
retries: 3
key.serializer: org.apache.kafka.common.serialization.StringSerializer
value.serializer: org.apache.kafka.common.serialization.StringSerializer
messageKey: some-value
continueOnFailure: true

Dataplex

To use this transport in your project, you need to include io.openlineage:transports-dataplex artifact in your build configuration. This is particularly important for environments like Spark, where this transport must be on the classpath for lineage events to be emitted correctly.

Configuration

  • type - string, must be "dataplex". Required.
  • endpoint - string, specifies the endpoint to which events are sent, default value is datalineage.googleapis.com:443. Optional.
  • projectId - string, the project quota identifier. If not provided, it is determined based on user credentials. Optional.
  • location - string, Dataplex location. Optional, default: "us".
  • credentialsFile - string, path to the Service Account credentials JSON file. Optional, if not provided Application Default Credentials are used
  • mode - enum that specifies the type of client used for publishing OpenLineage events to Dataplex. Possible values: sync (synchronous) or async (asynchronous). Optional, default: async.

Behavior

  • Events are serialized to JSON, included as part of a gRPC request, and then dispatched to the Dataplex endpoint.
  • Depending on the mode chosen, requests are sent using either a synchronous or asynchronous client.

Examples

transport:
type: dataplex
projectId: your_gcp_project_id
location: us
mode: sync
credentialsFile: path/to/credentials.json

Error Handling via Transport

// Connect to http://localhost:5000
OpenLineageClient client = OpenLineageClient.builder()
.transport(
HttpTransport.builder()
.uri("http://localhost:5000")
.apiKey("f38d2189-c603-4b46-bdea-e573a3b5a7d5")
.build())
.registerErrorHandler(new EmitErrorHandler() {
@Override
public void handleError(Throwable throwable) {
// Handle emit error here
}
}).build();

Defining Your Own Transport

OpenLineageClient client = OpenLineageClient.builder()
.transport(
new MyTransport() {
@Override
public void emit(OpenLineage.RunEvent runEvent) {
// Add emit logic here
}
}).build();

Circuit Breakers

info

This feature is available in OpenLineage versions >= 1.9.0.

To prevent from over-instrumentation OpenLineage integration provides a circuit breaker mechanism that stops OpenLineage from creating, serializing and sending OpenLineage events.

Simple Memory Circuit Breaker

Simple circuit breaker which is working based only on free memory within JVM. Configuration should contain free memory threshold limit (percentage). Default value is 20%. The circuit breaker will close within first call if free memory is low. circuitCheckIntervalInMillis parameter is used to configure a frequency circuit breaker is called. Default value is 1000ms, when no entry in config. timeoutInSeconds is optional. If set, OpenLineage code execution is terminated when a timeout is reached (added in version 1.13).

circuitBreaker:
type: simpleMemory
memoryThreshold: 20
circuitCheckIntervalInMillis: 1000
timeoutInSeconds: 90

Java Runtime Circuit Breaker

More complex version of circuit breaker. The amount of free memory can be low as long as amount of time spent on Garbage Collection is acceptable. JavaRuntimeCircuitBreaker closes when free memory drops below threshold and amount of time spent on garbage collection exceeds given threshold (10% by default). The circuit breaker is always open when checked for the first time as GC threshold is computed since the previous circuit breaker call. circuitCheckIntervalInMillis parameter is used to configure a frequency circuit breaker is called. Default value is 1000ms, when no entry in config. timeoutInSeconds is optional. If set, OpenLineage code execution is terminated when a timeout is reached (added in version 1.13).

circuitBreaker:
type: javaRuntime
memoryThreshold: 20
gcCpuThreshold: 10
circuitCheckIntervalInMillis: 1000
timeoutInSeconds: 90

Custom Circuit Breaker

List of available circuit breakers can be extended with custom one loaded via ServiceLoader with own implementation of io.openlineage.client.circuitBreaker.CircuitBreakerBuilder.

Metrics

info

This feature is available in OpenLineage 1.11 and above

To ease the operational experience of using the OpenLineage integrations, this document details the metrics collected by the Java client and the configuration settings for various metric backends.

Metrics collected by Java Client

The following table outlines the metrics collected by the OpenLineage Java client, which help in monitoring the integration's performance:

MetricDefinitionType
openlineage.emit.startNumber of events the integration started to sendCounter
openlineage.emit.completeNumber of events the integration completed sendingCounter
openlineage.emit.timeTime spent on emitting eventsTimer
openlineage.circuitbreaker.engagedStatus of the Circuit Breaker (engaged or not)Gauge

Metric Backends

OpenLineage uses Micrometer for metrics collection, similar to how SLF4J operates for logging. Micrometer provides a facade over different metric backends, allowing metrics to be dispatched to various destinations.

Configuring Metric Backends

Below are the available backends and potential configurations using Micrometer's facilities.

StatsD

Full configuration options for StatsD can be found in the Micrometer's StatsDConfig implementation.

metrics:
type: statsd
flavor: datadog
host: localhost
port: 8125

Dataset Namespace Resolver

info

This feature is available in OpenLineage 1.17 and above

Oftentimes host addresses are used to access data and a single dataset can be accessed via different addresses. For example, a Kafka topic can be accessed by a list of kafka bootstrap servers or any server from the list. In general, a problem can be solved by adding mechanism which resolves host addresses into logical identifier understood within the organisation. This applies for all clusters like Kafka or Cassandra which should be identified regardless of current list of hosts they contain. This also applies for JDBC urls where a physical address of database can change over time.

Host List Resolver

Host List Resolver given a list of hosts, replaces host name within the dataset namespace into the resolved value defined.

dataset:
namespaceResolvers:
resolved-name:
type: hostList
hosts: ['kafka-prod13.company.com', 'kafka-prod15.company.com']
schema: "kafka"

Pattern Namespace Resolver

Java regex pattern is used to identify a host. Substrings matching a pattern will be replaced with resolved name.

dataset:
namespaceResolvers:
resolved-name:
type: pattern
# 'cassandra-prod7.company.com', 'cassandra-prod8.company.com'
regex: 'cassandra-prod(\d)+\.company\.com'
schema: "cassandra"

Pattern Group Namespace Resolver

For this resolver, Java regex pattern is used to identify a host. However, instead of configured resolved name, a matchingGroup is used a resolved name. This can be useful when having several clusters made from hosts with a well-defined host naming convention.

dataset:
namespaceResolvers:
test-pattern:
type: patternGroup
# 'cassandra-test-7.company.com', 'cassandra-test-8.company.com', 'kafka-test-7.company.com', 'kafka-test-8.company.com'
regex: '(?<cluster>[a-zA-Z-]+)-(\d)+\.company\.com:[\d]*'
matchingGroup: "cluster"
schema: "cassandra"

Custom Resolver

Custom resolver can be added by implementing:

  • io.openlineage.client.dataset.namespaceResolver.DatasetNamespaceResolver
  • io.openlineage.client.dataset.namespaceResolver.DatasetNamespaceResolverBuilder
  • io.openlineage.client.dataset.namespaceResolver.DatasetNamespaceResolverConfig

Config class can be used to pass any namespace resolver parameters through standard configuration mechanism (Spark & Flink configuration or openlineage.yml file provided). Standard ServiceLoader approach is used to load and initiate custom classes.