Transport
Tip: See current list of all supported transports.
HTTP
Allows sending events to HTTP endpoint, using ApacheHTTPClient.
Configuration
type
- string, must be"http"
. Required.url
- string, base url for HTTP requests. Required.endpoint
- string specifying the endpoint to which events are sent, appended tourl
. Optional, default:/api/v1/lineage
.urlParams
- dictionary specifying query parameters send in HTTP requests. Optional.timeoutInMillis
- integer specifying timeout (in milliseconds) value used while connecting to server. Optional, default:5000
.auth
- dictionary specifying authentication options. Optional, by default no authorization is used. If set, requires thetype
property.type
- string specifying the "api_key" or the fully qualified class name of your TokenProvider. Required ifauth
is provided.apiKey
- string setting the Authentication HTTP header as the Bearer. Required iftype
isapi_key
.
headers
- dictionary specifying HTTP request headers. Optional.compression
- string, name of algorithm used by HTTP client to compress request body. Optional, default valuenull
, allowed values:gzip
. Added in v1.13.0.
Behavior
Events are serialized to JSON, and then are send as HTTP POST request with Content-Type: application/json
.
Examples
- Yaml Config
- Spark Config
- Flink Config
- Java Code
Anonymous connection:
transport:
type: http
url: http://localhost:5000
With authorization:
transport:
type: http
url: http://localhost:5000
auth:
type: api_key
api_key: f38d2189-c603-4b46-bdea-e573a3b5a7d5
Full example:
transport:
type: http
url: http://localhost:5000
endpoint: /api/v1/lineage
urlParams:
param0: value0
param1: value1
timeoutInMillis: 5000
auth:
type: api_key
api_key: f38d2189-c603-4b46-bdea-e573a3b5a7d5
headers:
X-Some-Extra-Header: abc
compression: gzip
Anonymous connection:
spark.openlineage.transport.type=http
spark.openlineage.transport.url=http://localhost:5000
With authorization:
spark.openlineage.transport.type=http
spark.openlineage.transport.url=http://localhost:5000
spark.openlineage.transport.auth.type=api_key
spark.openlineage.transport.auth.apiKey=f38d2189-c603-4b46-bdea-e573a3b5a7d5
Full example:
spark.openlineage.transport.type=http
spark.openlineage.transport.url=http://localhost:5000
spark.openlineage.transport.endpoint=/api/v1/lineage
spark.openlineage.transport.urlParams.param0=value0
spark.openlineage.transport.urlParams.param1=value1
spark.openlineage.transport.timeoutInMillis=5000
spark.openlineage.transport.auth.type=api_key
spark.openlineage.transport.auth.apiKey=f38d2189-c603-4b46-bdea-e573a3b5a7d5
spark.openlineage.transport.headers.X-Some-Extra-Header=abc
spark.openlineage.transport.compression=gzip
URL parsing within Spark integration
You can supply http parameters using values in url, the parsed spark.openlineage.*
properties are located in url as follows:
{transport.url}/{transport.endpoint}/namespaces/{namespace}/jobs/{parentJobName}/runs/{parentRunId}?app_name={appName}&api_key={transport.apiKey}&timeout={transport.timeout}&xxx={transport.urlParams.xxx}
example:
http://localhost:5000/api/v1/namespaces/ns_name/jobs/job_name/runs/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx?app_name=app&api_key=abc&timeout=5000&xxx=xxx
Anonymous connection:
spark.openlineage.transport.type=http
spark.openlineage.transport.url=http://localhost:5000
With authorization:
openlineage.transport.type=http
openlineage.transport.url=http://localhost:5000
openlineage.transport.auth.type=api_key
openlineage.transport.auth.apiKey=f38d2189-c603-4b46-bdea-e573a3b5a7d5
Full example:
openlineage.transport.type=http
openlineage.transport.url=http://localhost:5000
openlineage.transport.endpoint=/api/v1/lineage
openlineage.transport.urlParams.param0=value0
openlineage.transport.urlParams.param1=value1
openlineage.transport.timeoutInMillis=5000
openlineage.transport.auth.type=api_key
openlineage.transport.auth.apiKey=f38d2189-c603-4b46-bdea-e573a3b5a7d5
openlineage.transport.headers.X-Some-Extra-Header=abc
openlineage.transport.compression=gzip
Anonymous connection:
import io.openlineage.client.OpenLineageClient;
import io.openlineage.client.transports.HttpConfig;
import io.openlineage.client.transports.HttpTransport;
HttpConfig httpConfig = new HttpConfig();
httpConfig.setUrl("http://localhost:5000");
OpenLineageClient client = OpenLineageClient.builder()
.transport(
new HttpTransport(httpConfig))
.build();
With authorization:
import io.openlineage.client.OpenLineageClient;
import io.openlineage.client.transports.ApiKeyTokenProvider;
import io.openlineage.client.transports.HttpConfig;
import io.openlineage.client.transports.HttpTransport;
ApiKeyTokenProvider apiKeyTokenProvider = new ApiKeyTokenProvider();
apiKeyTokenProvider.setApiKey("f38d2189-c603-4b46-bdea-e573a3b5a7d5");
HttpConfig httpConfig = new HttpConfig();
httpConfig.setUrl("http://localhost:5000");
httpConfig.setAuth(apiKeyTokenProvider);
OpenLineageClient client = OpenLineageClient.builder()
.transport(
new HttpTransport(httpConfig))
.build();
Full example:
import java.util.Map;
import io.openlineage.client.OpenLineageClient;
import io.openlineage.client.transports.ApiKeyTokenProvider;
import io.openlineage.client.transports.HttpConfig;
import io.openlineage.client.transports.HttpTransport;
Map<String, String> queryParams = Map.of(
"param0", "value0",
"param1", "value1"
);
Map<String, String> headers = Map.of(
"X-Some-Extra-Header", "abc"
);
ApiKeyTokenProvider apiKeyTokenProvider = new ApiKeyTokenProvider();
apiKeyTokenProvider.setApiKey("f38d2189-c603-4b46-bdea-e573a3b5a7d5");
HttpConfig httpConfig = new HttpConfig();
httpConfig.setUrl("http://localhost:5000");
httpConfig.setEndpoint("/api/v1/lineage");
httpConfig.setUrlParams(queryParams);
httpConfig.setAuth(apiKeyTokenProvider);
httpConfig.setTimeoutInMillis(headers);
httpConfig.setHeaders(5000);
httpConfig.setCompression(HttpConfig.Compression.GZIP);
OpenLineageClient client = OpenLineageClient.builder()
.transport(
new HttpTransport(httpConfig))
.build();
Kafka
If a transport type is set to kafka
, then the below parameters would be read and used when building KafkaProducer.
This transport requires the artifact org.apache.kafka:kafka-clients:3.1.0
(or compatible) on your classpath.
Configuration
-
type
- string, must be"kafka"
. Required. -
topicName
- string specifying the topic on what events will be sent. Required. -
properties
- a dictionary containing a Kafka producer config as in Kafka producer config. Required. -
localServerId
- deprecated, renamed tomessageKey
since v1.13.0. -
messageKey
- string, key for all Kafka messages produced by transport. Optional, default value described below. Added in v1.13.0.Default values for
messageKey
are:run:{parentJob.namespace}/{parentJob.name}
- for RunEvent with parent facetrun:{job.namespace}/{job.name}
- for RunEventjob:{job.namespace}/{job.name}
- for JobEventdataset:{dataset.namespace}/{dataset.name}
- for DatasetEvent
Behavior
Events are serialized to JSON, and then dispatched to the Kafka topic.
Notes
It is recommended to provide messageKey
if Job hierarchy is used. It can be any string, but it should be the same for all jobs in
hierarchy, like Airflow task -> Spark application -> Spark task runs
.
Examples
- Yaml Config
- Spark Config
- Flink Config
- Java Code
transport:
type: kafka
topicName: openlineage.events
properties:
bootstrap.servers: localhost:9092,another.host:9092
acks: all
retries: 3
key.serializer: org.apache.kafka.common.serialization.StringSerializer
value.serializer: org.apache.kafka.common.serialization.StringSerializer
messageKey: some-value
spark.openlineage.transport.type=kafka
spark.openlineage.transport.topicName=openlineage.events
spark.openlineage.transport.properties.bootstrap.servers=localhost:9092,another.host:9092
spark.openlineage.transport.properties.acks=all
spark.openlineage.transport.properties.retries=3
spark.openlineage.transport.properties.key.serializer=org.apache.kafka.common.serialization.StringSerializer
spark.openlineage.transport.properties.value.serializer=org.apache.kafka.common.serialization.StringSerializer
spark.openlineage.transport.messageKey=some-value
openlineage.transport.type=kafka
openlineage.transport.topicName=openlineage.events
openlineage.transport.properties.bootstrap.servers=localhost:9092,another.host:9092
openlineage.transport.properties.acks=all
openlineage.transport.properties.retries=3
openlineage.transport.properties.key.serializer=org.apache.kafka.common.serialization.StringSerializer
openlineage.transport.properties.value.serializer=org.apache.kafka.common.serialization.StringSerializer
openlineage.transport.messageKey=some-value
import java.util.Properties;
import io.openlineage.client.OpenLineageClient;
import io.openlineage.client.transports.KafkaConfig;
import io.openlineage.client.transports.KafkaTransport;
Properties kafkaProperties = new Properties();
kafkaProperties.setProperty("bootstrap.servers", "localhost:9092,another.host:9092");
kafkaProperties.setProperty("acks", "all");
kafkaProperties.setProperty("retries", "3");
kafkaProperties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProperties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaConfig kafkaConfig = new KafkaConfig();
KafkaConfig.setTopicName("openlineage.events");
KafkaConfig.setProperties(kafkaProperties);
KafkaConfig.setLocalServerId("some-value");
OpenLineageClient client = OpenLineageClient.builder()
.transport(
new KafkaTransport(httpConfig))
.build();
Notes:
It is recommended to provide messageKey
if Job hierarchy is used. It can be any string, but it should be the same for all jobs in
hierarchy, like Airflow task -> Spark application
.
Default values are:
run:{parentJob.namespace}/{parentJob.name}/{parentRun.id}
- for RunEvent with parent facetrun:{job.namespace}/{job.name}/{run.id}
- for RunEventjob:{job.namespace}/{job.name}
- for JobEventdataset:{dataset.namespace}/{dataset.name}
- for DatasetEvent
Kinesis
If a transport type is set to kinesis
, then the below parameters would be read and used when building KinesisProducer.
Also, KinesisTransport depends on you to provide artifact com.amazonaws:amazon-kinesis-producer:0.14.0
or compatible on your classpath.
Configuration
type
- string, must be"kinesis"
. Required.streamName
- the streamName of the Kinesis. Required.region
- the region of the Kinesis. Required.roleArn
- the roleArn which is allowed to read/write to Kinesis stream. Optional.properties
- a dictionary that contains a Kinesis allowed properties. Optional.
Behavior
- Events are serialized to JSON, and then dispatched to the Kinesis stream.
- The partition key is generated as
{jobNamespace}:{jobName}
. - Two constructors are available: one accepting both
KinesisProducer
andKinesisConfig
and another solely acceptingKinesisConfig
.
Examples
- Yaml Config
- Spark Config
- Flink Config
- Java Code
transport:
type: kinesis
streamName: your_kinesis_stream_name
region: your_aws_region
roleArn: arn:aws:iam::account-id:role/role-name
properties:
VerifyCertificate: true
ConnectTimeout: 6000
spark.openlineage.transport.type=kinesis
spark.openlineage.transport.streamName=your_kinesis_stream_name
spark.openlineage.transport.region=your_aws_region
spark.openlineage.transport.roleArn=arn:aws:iam::account-id:role/role-name
spark.openlineage.transport.properties.VerifyCertificate=true
spark.openlineage.transport.properties.ConnectTimeout=6000
openlineage.transport.type=kinesis
openlineage.transport.streamName=your_kinesis_stream_name
openlineage.transport.region=your_aws_region
openlineage.transport.roleArn=arn:aws:iam::account-id:role/role-name
openlineage.transport.properties.VerifyCertificate=true
openlineage.transport.properties.ConnectTimeout=6000
import java.util.Properties;
import io.openlineage.client.OpenLineageClient;
import io.openlineage.client.transports.KinesisConfig;
import io.openlineage.client.transports.KinesisTransport;
Properties kinesisProperties = new Properties();
kinesisProperties.setProperty("property_name_1", "value_1");
kinesisProperties.setProperty("property_name_2", "value_2");
KinesisConfig kinesisConfig = new KinesisConfig();
kinesisConfig.setStreamName("your_kinesis_stream_name");
kinesisConfig.setRegion("your_aws_region");
kinesisConfig.setRoleArn("arn:aws:iam::account-id:role/role-name");
kinesisConfig.setProperties(kinesisProperties);
OpenLineageClient client = OpenLineageClient.builder()
.transport(
new KinesisTransport(httpConfig))
.build();
Console
This straightforward transport emits OpenLineage events directly to the console through a logger. No additional configuration is required.
Behavior
Events are serialized to JSON. Then each event is logged with INFO
level to logger with name ConsoleTransport
.
Notes
Be cautious when using the DEBUG
log level, as it might result in double-logging due to the OpenLineageClient
also logging.
Configuration
type
- string, must be"console"
. Required.
Examples
- Yaml Config
- Spark Config
- Flink Config
- Java Code
transport:
type: console
spark.openlineage.transport.type=console
openlineage.transport.type=console
import java.util.Properties;
import io.openlineage.client.OpenLineageClient;
import io.openlineage.client.transports.ConsoleTransport;
OpenLineageClient client = OpenLineageClient.builder()
.transport(
new ConsoleTransport())
.build();
File
Designed mainly for integration testing, the FileTransport
emits OpenLineage events to a given file.
Configuration
type
- string, must be"file"
. Required.location
- string specifying the path of the file. Required.
Behavior
- If the target file is absent, it's created.
- Events are serialized to JSON, and then appended to a file, separated by newlines.
- Intrinsic newline characters within the event JSON are eliminated to ensure one-line events.
Notes for Yarn/Kubernetes
This transport type is pretty useless on Spark/Flink applications deployed to Yarn or Kubernetes cluster:
- Each executor will write file to a local filesystem of Yarn container/K8s pod. So resulting file will be removed when such container/pod is destroyed.
- Kubernetes persistent volumes are not destroyed after pod removal. But all the executors will write to the same network disk in parallel, producing a broken file.
Examples
- Yaml Config
- Spark Config
- Flink Config
- Java Code
transport:
type: file
location: /path/to/your/file
spark.openlineage.transport.type=file
spark.openlineage.transport.location=/path/to/your/filext
openlineage.transport.type=file
openlineage.transport.location=/path/to/your/file
import java.util.Properties;
import io.openlineage.client.OpenLineageClient;
import io.openlineage.client.transports.FileConfig;
import io.openlineage.client.transports.FileTransport;
FileConfig fileConfig = new FileConfig("/path/to/your/file");
OpenLineageClient client = OpenLineageClient.builder()
.transport(
new FileTransport(fileConfig))
.build();