Transport
Tip: See current list of all supported transports.
HTTP
Allows sending events to HTTP endpoint, using ApacheHTTPClient.
Configuration
type- string, must be"http". Required.url- string, base url for HTTP requests. Required.endpoint- string specifying the endpoint to which events are sent, appended tourl. Optional, default:/api/v1/lineage.urlParams- dictionary specifying query parameters send in HTTP requests. Optional.timeoutInMillis- integer specifying timeout (in milliseconds) value used while connecting to server. Optional, default:5000.auth- dictionary specifying authentication options. Optional, by default no authorization is used. If set, requires thetypeproperty.type- string specifying value for one of the out-of-the-box available authentication methods (apiKeyorjwt), or the fully qualified class name of your TokenProvider. Required ifauthis provided.- Configuration options for
api_keyauthentication:apiKey- string setting the Authentication HTTP header as the Bearer. Required iftypeisapi_key.
- Configuration options for
jwtauthentication are documented in the JWT Token Provider section.
headers- dictionary specifying HTTP request headers. Optional.compression- string, name of algorithm used by HTTP client to compress request body. Optional, default valuenull, allowed values:gzip. Added in v1.13.0.
Behavior
Events are serialized to JSON, and then are send as HTTP POST request with Content-Type: application/json.
Examples
- Yaml Config
- Spark Config
- Flink Config
- Java Code
Anonymous connection:
transport:
type: http
url: http://localhost:5000
With authorization:
transport:
type: http
url: http://localhost:5000
auth:
type: api_key
api_key: f38d2189-c603-4b46-bdea-e573a3b5a7d5
Full example:
transport:
type: http
url: http://localhost:5000
endpoint: /api/v1/lineage
urlParams:
param0: value0
param1: value1
timeoutInMillis: 5000
auth:
type: api_key
api_key: f38d2189-c603-4b46-bdea-e573a3b5a7d5
headers:
X-Some-Extra-Header: abc
compression: gzip
Anonymous connection:
spark.openlineage.transport.type=http
spark.openlineage.transport.url=http://localhost:5000
With authorization:
spark.openlineage.transport.type=http
spark.openlineage.transport.url=http://localhost:5000
spark.openlineage.transport.auth.type=api_key
spark.openlineage.transport.auth.apiKey=f38d2189-c603-4b46-bdea-e573a3b5a7d5
Full example:
spark.openlineage.transport.type=http
spark.openlineage.transport.url=http://localhost:5000
spark.openlineage.transport.endpoint=/api/v1/lineage
spark.openlineage.transport.urlParams.param0=value0
spark.openlineage.transport.urlParams.param1=value1
spark.openlineage.transport.timeoutInMillis=5000
spark.openlineage.transport.auth.type=api_key
spark.openlineage.transport.auth.apiKey=f38d2189-c603-4b46-bdea-e573a3b5a7d5
spark.openlineage.transport.headers.X-Some-Extra-Header=abc
spark.openlineage.transport.compression=gzip
With SSL context:
spark.openlineage.transport.sslContext.storePassword=...
spark.openlineage.transport.sslContext.keyPassword=...
spark.openlineage.transport.sslContext.keyStoreType=...
spark.openlineage.transport.sslContext.keyStorePath=...
where the config contains location of the keystore file, keystore password and its type. It should also contain key password.
URL parsing within Spark integration
You can supply http parameters using values in url, the parsed spark.openlineage.* properties are located in url as follows:
{transport.url}/{transport.endpoint}/namespaces/{namespace}/jobs/{parentJobName}/runs/{parentRunId}?app_name={appName}&api_key={transport.apiKey}&timeout={transport.timeout}&xxx={transport.urlParams.xxx}
example:
http://localhost:5000/api/v1/namespaces/ns_name/jobs/job_name/runs/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx?app_name=app&api_key=abc&timeout=5000&xxx=xxx
Anonymous connection:
spark.openlineage.transport.type=http
spark.openlineage.transport.url=http://localhost:5000
With authorization:
openlineage.transport.type=http
openlineage.transport.url=http://localhost:5000
openlineage.transport.auth.type=api_key
openlineage.transport.auth.apiKey=f38d2189-c603-4b46-bdea-e573a3b5a7d5
Full example:
openlineage.transport.type=http
openlineage.transport.url=http://localhost:5000
openlineage.transport.endpoint=/api/v1/lineage
openlineage.transport.urlParams.param0=value0
openlineage.transport.urlParams.param1=value1
openlineage.transport.timeoutInMillis=5000
openlineage.transport.auth.type=api_key
openlineage.transport.auth.apiKey=f38d2189-c603-4b46-bdea-e573a3b5a7d5
openlineage.transport.headers.X-Some-Extra-Header=abc
openlineage.transport.compression=gzip
With SSL context:
openlineage.transport.sslContext.storePassword=...
openlineage.transport.sslContext.keyPassword=...
openlineage.transport.sslContext.keyStoreType=...
openlineage.transport.sslContext.keyStorePath=...
where the config contains location of the keystore file, keystore password and its type. It should also contain key password.
Anonymous connection:
import io.openlineage.client.OpenLineageClient;
import io.openlineage.client.transports.HttpConfig;
import io.openlineage.client.transports.HttpTransport;
HttpConfig httpConfig = new HttpConfig();
httpConfig.setUrl("http://localhost:5000");
OpenLineageClient client = OpenLineageClient.builder()
.transport(
new HttpTransport(httpConfig))
.build();
With authorization:
import io.openlineage.client.OpenLineageClient;
import io.openlineage.client.transports.ApiKeyTokenProvider;
import io.openlineage.client.transports.HttpConfig;
import io.openlineage.client.transports.HttpTransport;
ApiKeyTokenProvider apiKeyTokenProvider = new ApiKeyTokenProvider();
apiKeyTokenProvider.setApiKey("f38d2189-c603-4b46-bdea-e573a3b5a7d5");
HttpConfig httpConfig = new HttpConfig();
httpConfig.setUrl("http://localhost:5000");
httpConfig.setAuth(apiKeyTokenProvider);
OpenLineageClient client = OpenLineageClient.builder()
.transport(
new HttpTransport(httpConfig))
.build();
Full example:
import java.util.Map;
import io.openlineage.client.OpenLineageClient;
import io.openlineage.client.transports.ApiKeyTokenProvider;
import io.openlineage.client.transports.HttpConfig;
import io.openlineage.client.transports.HttpTransport;
Map<String, String> queryParams = Map.of(
"param0", "value0",
"param1", "value1"
);
Map<String, String> headers = Map.of(
"X-Some-Extra-Header", "abc"
);
ApiKeyTokenProvider apiKeyTokenProvider = new ApiKeyTokenProvider();
apiKeyTokenProvider.setApiKey("f38d2189-c603-4b46-bdea-e573a3b5a7d5");
HttpConfig httpConfig = new HttpConfig();
httpConfig.setUrl("http://localhost:5000");
httpConfig.setEndpoint("/api/v1/lineage");
httpConfig.setUrlParams(queryParams);
httpConfig.setAuth(apiKeyTokenProvider);
httpConfig.setTimeoutInMillis(5000);
httpConfig.setHeaders(headers);
httpConfig.setCompression(HttpConfig.Compression.GZIP);
OpenLineageClient client = OpenLineageClient.builder()
.transport(
new HttpTransport(httpConfig))
.build();
With SSL Context:
httpConfig.setSslContextConfig(new HttpSslContextConfig(keyStorePassword, keyPassword, keyStoreType, keyStoreFileName));
where the config contains location of the keystore file, keystore password and its type. It should also contain key password.
JWT Token Provider
The JwtTokenProvider is an authentication provider that exchanges an API key for a JWT token via a POST endpoint. This is useful for services that require OAuth-style authentication where you need to obtain a token before making API requests.
Configuration
When using JWT authentication with HTTP transport, configure the auth section as follows:
type- string, must be"jwt". Required.apiKey- string, the API key used to obtain the JWT token. Required.tokenEndpoint- string, the URL endpoint for token generation. Required.tokenFields- array of strings, JSON field names to search for the token in the response. The provider tries each field in order. Optional, default:["token", "access_token"].expiresInField- string, JSON field name containing the token expiration time in seconds. Optional, default:"expires_in".grantType- string, OAuth grant type parameter sent in the token request. Optional, default:"urn:ietf:params:oauth:grant-type:jwt-bearer".responseType- string, OAuth response type parameter sent in the token request. Optional, default:"token".tokenRefreshBuffer- integer, number of seconds before token expiry to trigger a refresh. Optional, default:120.