Python
Overview
The Python client is the basis of existing OpenLineage integrations such as Airflow and dbt.
The client enables the creation of lineage metadata events with Python code.
The core data structures currently offered by the client are the RunEvent
, RunState
, Run
, Job
, Dataset
,
and Transport
classes. These either configure or collect data for the emission of lineage events.
You can use the client to create your own custom integrations.
Installation
Download the package using pip
with
pip install openlineage-python
To install the package from source, use
python -m pip install .
Configuration
We recommend configuring the client with an openlineage.yml
file that contains all the
details of how to connect to your OpenLineage backend.
You can make this file available to the client in three ways (the list also presents precedence of the configuration):
- Set an
OPENLINEAGE_CONFIG
environment variable to a file path:OPENLINEAGE_CONFIG=path/to/openlineage.yml
. - Place an
openlineage.yml
file in the current working directory (the absolute path of the directory where your script or process is currently running). - Place an
openlineage.yml
file under.openlineage/
in the user's home directory (~/.openlineage/openlineage.yml
).
In openlineage.yml
, use a standard Transport
interface to specify the transport type
(http
, console
, kafka
, file
, or custom) and authorization parameters.
See the example config file for each transport type.
If there is no config file found, the OpenLineage client looks at environment variables for HTTP transport.
At the end, if no configuration is found, ConsoleTransport
is used, the events are printed in the console.
Environment Variables
The following environment variables are available to use:
Name | Description | Example | Since |
---|---|---|---|
OPENLINEAGE_CONFIG | The path to the YAML configuration file | path/to/openlineage.yml | |
OPENLINEAGE_CLIENT_LOGGING | Logging level of OpenLineage client and its child modules | DEBUG | |
OPENLINEAGE_DISABLED | When true , OpenLineage will not emit events (default: false) | false | 0.9.0 |
OPENLINEAGE_URL | The URL to send lineage events to (also see OPENLINEAGE_ENDPOINT) | https://myapp.com | |
OPENLINEAGE_ENDPOINT | Endpoint to which events are sent (default: api/v1/lineage) | api/v2/events | |
OPENLINEAGE_API_KEY | Token included in the Authentication HTTP header as the Bearer | secret_token_123 |
If you are using Airflow integration, there are additional environment variables available.
Dynamic configuration with environment variables
You can also configure the client with dynamic environment variables.
Environment variables that configure the OpenLineage client follow a specific pattern. All variables that affect the client configuration start with the prefix OPENLINEAGE__
, followed by nested keys separated by double underscores (__
).
Key Features
- Prefix Requirement: All environment variables must begin with
OPENLINEAGE__
. - Sections Separation: Configuration sections are separated using double underscores
__
to form the hierarchy. - Lowercase Conversion: Environment variable values are automatically converted to lowercase.
- JSON String Support: You can pass a JSON string at any level of the configuration hierarchy, which will be merged into the final configuration structure.
- Hyphen Restriction: Since environment variable names cannot contain
-
(hyphen), if a name strictly requires a hyphen, use a JSON string as the value of the environment variable. - Precedence Rules:
- Top-level keys have precedence and will not be overwritten by more nested entries.
- For example,
OPENLINEAGE__TRANSPORT='{..}'
will not have its keys overwritten byOPENLINEAGE__TRANSPORT__AUTH__KEY='key'
.
Dynamic Alias for Transport Variables
To facilitate easier management of environment variables, aliases are dynamically created for certain variables like OPENLINEAGE_URL
. If OPENLINEAGE_URL
is set, it automatically translates into specific transport configurations
that can be used with Composite transport with default_http
as the name of the HTTP transport.
Alias rules are following:
- If environment variable
OPENLINEAGE_URL
="http://example.com" is set, it would insert following environment variables:
OPENLINEAGE__TRANSPORT__TRANSPORTS__DEFAULT_HTTP__TYPE="http"
OPENLINEAGE__TRANSPORT__TRANSPORTS__DEFAULT_HTTP__URL="http://example.com"
- Similarly if environment variable
OPENLINEAGE_API_KEY
="random_key" is set, it will be translated to:
OPENLINEAGE__TRANSPORT__TRANSPORTS__DEFAULT_HTTP__AUTH='{"type": "api_key", "apiKey": "random_key"}'
qually with environment variable OPENLINEAGE_ENDPOINT
="api/v1/lineage", that translates to:
OPENLINEAGE__TRANSPORT__TRANSPORTS__DEFAULT_HTTP__ENDPOINT="api/v1/lineage"
- If one does not want to use aliased HTTP transport in Composite Transport, they can set
OPENLINEAGE__TRANSPORT__TRANSPORTS__DEFAULT_HTTP
to{}
.
Examples
- Basic Example
- Composite Example
- Precedence Example
- Kafka Transport Example
Setting following environment variables:
OPENLINEAGE__TRANSPORT__TYPE=http
OPENLINEAGE__TRANSPORT__URL=http://localhost:5050
OPENLINEAGE__TRANSPORT__ENDPOINT=/api/v1/lineage
OPENLINEAGE__TRANSPORT__AUTH='{"type":"api_key", "apiKey":"random_token"}'
OPENLINEAGE__TRANSPORT__COMPRESSION=gzip
is equivalent to passing following YAML configuration:
transport:
type: http
url: http://localhost:5050
endpoint: api/v1/lineage
auth:
type: api_key
apiKey: random_token
compression: gzip
Setting following environment variables:
OPENLINEAGE__TRANSPORT__TYPE=composite
OPENLINEAGE__TRANSPORT__TRANSPORTS__FIRST__TYPE=http
OPENLINEAGE__TRANSPORT__TRANSPORTS__FIRST__URL=http://localhost:5050
OPENLINEAGE__TRANSPORT__TRANSPORTS__FIRST__ENDPOINT=/api/v1/lineage
OPENLINEAGE__TRANSPORT__TRANSPORTS__FIRST__AUTH='{"type":"api_key", "apiKey":"random_token"}'
OPENLINEAGE__TRANSPORT__TRANSPORTS__FIRST__COMPRESSION=gzip
OPENLINEAGE__TRANSPORT__TRANSPORTS__SECOND__TYPE=console
is equivalent to passing following YAML configuration:
transport:
type: composite
transports:
first:
type: http
url: http://localhost:5050
endpoint: api/v1/lineage
auth:
type: api_key
apiKey: random_token
compression: gzip
second:
type: console
Setting following environment variables:
OPENLINEAGE__TRANSPORT='{"type":"console"}'
OPENLINEAGE__TRANSPORT__TYPE=http
is equivalent to passing following YAML configuration:
transport:
type: console
Setting following environment variables:
OPENLINEAGE__TRANSPORT__TYPE=kafka
OPENLINEAGE__TRANSPORT__TOPIC=my_topic
OPENLINEAGE__TRANSPORT__CONFIG='{"bootstrap.servers": "localhost:9092,another.host:9092", "acks": "all", "retries": 3}'
OPENLINEAGE__TRANSPORT__FLUSH=true
OPENLINEAGE__TRANSPORT__MESSAGE_KEY=some-value
is equivalent to passing following YAML configuration:
transport:
type: kafka
topic: my_topic
config:
bootstrap.servers: localhost:9092,another.host:9092
acks: all
retries: 3
flush: true
message_key: some-value # this has been aliased to messageKey
HTTP transport configuration with environment variables
For backwards compatibility, the simplest HTTP transport configuration, with only a subset of its config, can be done with environment variables (all other transport types are only configurable with YAML file). This setup can be done with the following environment variables:
OPENLINEAGE_URL
(required)OPENLINEAGE_ENDPOINT
(optional, default:api/v1/lineage
)OPENLINEAGE_API_KEY
(optional).
Built-in Transport Types
HTTP
Allows sending events to HTTP endpoint, using requests.
Configuration
type
- string, must be"http"
. Required.url
- string, base url for HTTP requests. Required.endpoint
- string specifying the endpoint to which events are sent, appended tourl
. Optional, default:api/v1/lineage
.timeout
- float specifying timeout (in seconds) value used while connecting to server. Optional, default:5
.verify
- boolean specifying whether the client should verify TLS certificates from the backend. Optional, default:true
.auth
- dictionary specifying authentication options. Optional, by default no authorization is used. If set, requires thetype
property.type
- string specifying the "api_key" or the fully qualified class name of your TokenProvider. Required ifauth
is provided.apiKey
- string setting the Authentication HTTP header as the Bearer. Required iftype
isapi_key
.
compression
- string, name of algorithm used by HTTP client to compress request body. Optional, default valuenull
, allowed values:gzip
. Added in v1.13.0.custom_headers
- dictionary of additional headers to be sent with each request. Optional, default:{}
.retry
- dictionary of additional configuration options passed tourllib3.util.Retry
object. Added in v1.33.0. Defaults are below; those are non-exhaustive options, but the ones that are set by default. Look aturllib3.util.Retry
options for full reference.total
- total number of retries to be attempted. Default is5
.read
- number of retries to be attempted on read errors. Default is5
.connect
- number of retries to be attempted on connection errors. Default is5
.backoff_factor
- a backoff factor to apply between attempts after the second try, default is0.3
.status_forcelist
- a set of integer HTTP status codes that we should force a retry on, default is[500, 502, 503, 504]
.allowed_methods
- a set of HTTP methods that we should retry on, default is["HEAD", "POST"]
.
Behavior
Events are serialized to JSON, and then are send as HTTP POST request with Content-Type: application/json
.
Examples
- Yaml Config
- Python Code
transport:
type: http
url: https://backend:5000
endpoint: api/v1/lineage
timeout: 5
verify: false
auth:
type: api_key
apiKey: f048521b-dfe8-47cd-9c65-0cb07d57591e
compression: gzip
retry:
total: 5
read: 5
connect: 5
backoff_factor: 0.3
status_forcelist: [500, 502, 503, 504]
allowed_methods: ["HEAD", "POST"]
from openlineage.client import OpenLineageClient
from openlineage.client.transport.http import ApiKeyTokenProvider, HttpConfig, HttpCompression, HttpTransport
http_config = HttpConfig(
url="https://backend:5000",
endpoint="api/v1/lineage",
timeout=5,
verify=False,
auth=ApiKeyTokenProvider({"apiKey": "f048521b-dfe8-47cd-9c65-0cb07d57591e"}),
compression=HttpCompression.GZIP,
)
client = OpenLineageClient(transport=HttpTransport(http_config))
Console
This straightforward transport emits OpenLineage events directly to the console through a logger. No additional configuration is required.