Python Client
openlineage.client.client module
- class openlineage.client.client.OpenLineageClientOptions(timeout=5.0, verify=True, api_key=None, adapter=None)
Bases:
object
- Parameters:
timeout (float)
verify (bool)
api_key (str | None)
adapter (HTTPAdapter | None)
- timeout: float
- verify: bool
- api_key: str | None
- adapter: HTTPAdapter | None
- class openlineage.client.client.OpenLineageConfig(transport=NOTHING, facets=NOTHING, filters=NOTHING, tags=NOTHING)
Bases:
object
- Parameters:
transport (dict[str, Any] | None)
facets (FacetsConfig)
filters (list[FilterConfig])
tags (TagsConfig)
- transport: dict[str, Any] | None
- facets: FacetsConfig
- filters: list[FilterConfig]
- tags: TagsConfig
- classmethod from_dict(params)
- Parameters:
params (
dict
[str
,Any
])- Return type:
- class openlineage.client.client.OpenLineageClient(url=None, options=None, session=None, transport=None, factory=None, *, config=None)
Bases:
object
- Parameters:
url (str | None)
options (OpenLineageClientOptions | None)
session (Session | None)
transport (Transport | None)
factory (TransportFactory | None)
config (dict[str, Any] | None)
- DYNAMIC_ENV_VARS_PREFIX = 'OPENLINEAGE__'
- DEFAULT_URL_TRANSPORT_NAME = 'default_http'
- classmethod from_environment()
- Return type:
TypeVar
(_T
, bound= OpenLineageClient)
- classmethod from_dict(config)
- Parameters:
config (
dict
[str
,str
])- Return type:
TypeVar
(_T
, bound= OpenLineageClient)
- filter_event(event)
Filters jobs according to config-defined events
- Parameters:
event (Event)
- Return type:
Event | None
- emit(event)
- Parameters:
event (
Union
[RunEvent
,DatasetEvent
,JobEvent
,RunEvent
,DatasetEvent
,JobEvent
])- Return type:
None
- close(timeout=-1.0)
Closes down the transport until all events are processed or timeout is reached. Params:
timeout: Timeout in seconds. -1 means to block until last event is processed, 0 means no timeout.
- Parameters:
timeout (
float
)- Return type:
bool
- property config: OpenLineageConfig
Retrieves the OpenLineage configuration.
This property method returns the content of the OpenLineage YAML config file. The configuration is determined by merging sources in the following order of precedence: 1. User-defined configuration passed to the client constructor. 2. YAML config file located in one of the following paths: - Path specified by the OPENLINEAGE_CONFIG environment variable. - Current working directory. - $HOME/.openlineage. 3. Environment variables with the OPENLINEAGE__ prefix. If the configuration is not already loaded, it will be constructed by merging the above sources. In case of a TypeError during the parsing of the configuration, a ValueError will be raised indicating that the structure of the config does not match the expected format.
- add_environment_facets(event)
Adds environment variables as facets to the event object.
- Parameters:
event (
Union
[RunEvent
,DatasetEvent
,JobEvent
,RunEvent
,DatasetEvent
,JobEvent
])- Return type:
Union
[RunEvent
,DatasetEvent
,JobEvent
,RunEvent
,DatasetEvent
,JobEvent
]
- update_event_tags_facets(event)
Creates or updates job and run tag facets based on user-supplied environment variables
- Parameters:
event (
Union
[RunEvent
,DatasetEvent
,JobEvent
,RunEvent
,DatasetEvent
,JobEvent
])- Return type:
Union
[RunEvent
,DatasetEvent
,JobEvent
,RunEvent
,DatasetEvent
,JobEvent
]
openlineage.client.event_v2 module
- class openlineage.client.event_v2.BaseEvent(*, eventTime, producer='')
Bases:
RedactMixin
- Parameters:
eventTime (
str
)producer (
str
)
-
eventTime:
str
the time the event occurred at
-
producer:
str
-
schemaURL:
str
- property skip_redact: list[str]
- eventtime_check(attribute, value)
- Parameters:
attribute (
str
)value (
str
)
- Return type:
None
- producer_check(attribute, value)
- Parameters:
attribute (
str
)value (
str
)
- Return type:
None
- schemaurl_check(attribute, value)
- Parameters:
attribute (
str
)value (
str
)
- Return type:
None
- class openlineage.client.event_v2.Dataset(namespace, name, *, facets=NOTHING)
Bases:
RedactMixin
- Parameters:
namespace (str)
name (str)
facets (dict[str, DatasetFacet] | None)
- namespace: str
The namespace containing that dataset
- name: str
The unique name for that dataset within that namespace
- facets: dict[str, DatasetFacet] | None
The facets for this dataset
- class openlineage.client.event_v2.DatasetEvent(*, eventTime, producer='', dataset)
Bases:
BaseEvent
- Parameters:
eventTime (str)
producer (str)
dataset (StaticDataset)
-
dataset:
StaticDataset
- class openlineage.client.event_v2.InputDataset(namespace, name, inputFacets=NOTHING, *, facets=NOTHING)
Bases:
Dataset
An input dataset
- Parameters:
namespace (str)
name (str)
inputFacets (dict[str, InputDatasetFacet] | None)
facets (dict[str, DatasetFacet] | None)
- inputFacets: dict[str, InputDatasetFacet] | None
The input facets for this dataset.
- class openlineage.client.event_v2.Job(namespace, name, facets=NOTHING)
Bases:
RedactMixin
- Parameters:
namespace (str)
name (str)
facets (dict[str, JobFacet] | None)
- namespace: str
The namespace containing that job
- name: str
The unique name for that job within that namespace
- facets: dict[str, JobFacet] | None
The job facets.
- class openlineage.client.event_v2.JobEvent(*, eventTime, producer='', job, inputs=NOTHING, outputs=NOTHING)
Bases:
BaseEvent
- Parameters:
eventTime (str)
producer (str)
job (Job)
inputs (list[InputDataset] | None)
outputs (list[OutputDataset] | None)
- job: Job
- inputs: list[InputDataset] | None
The set of input datasets.
- outputs: list[OutputDataset] | None
The set of output datasets.
- class openlineage.client.event_v2.OutputDataset(namespace, name, outputFacets=NOTHING, *, facets=NOTHING)
Bases:
Dataset
An output dataset
- Parameters:
namespace (str)
name (str)
outputFacets (dict[str, OutputDatasetFacet] | None)
facets (dict[str, DatasetFacet] | None)
- outputFacets: dict[str, OutputDatasetFacet] | None
The output facets for this dataset
- class openlineage.client.event_v2.Run(runId, facets=NOTHING)
Bases:
RedactMixin
- Parameters:
runId (str)
facets (dict[str, RunFacet] | None)
- runId: str
The globally unique ID of the run associated with the job.
- facets: dict[str, RunFacet] | None
The run facets.
- runid_check(attribute, value)
- Parameters:
attribute (
str
)value (
str
)
- Return type:
None
- class openlineage.client.event_v2.RunEvent(*, eventTime, producer='', run, job, eventType=None, inputs=NOTHING, outputs=NOTHING)
Bases:
BaseEvent
- Parameters:
eventTime (str)
producer (str)
run (Run)
job (Job)
eventType (EventType | None)
inputs (list[InputDataset] | None)
outputs (list[OutputDataset] | None)
- run: Run
- job: Job
- eventType: EventType | None
the current transition of the run state. It is required to issue 1 START event and 1 of [ COMPLETE, ABORT, FAIL ] event per run. Additional events with OTHER eventType can be added to the same run. For example to send additional metadata after the run is complete
- inputs: list[InputDataset] | None
The set of input datasets.
- outputs: list[OutputDataset] | None
The set of output datasets.
- openlineage.client.event_v2.RunState
alias of
EventType
- openlineage.client.event_v2.set_producer(producer)
- Parameters:
producer (
str
)- Return type:
None
openlineage.client.facet module
- openlineage.client.facet.set_producer(producer)
- Parameters:
producer (
str
)- Return type:
None
- class openlineage.client.facet.BaseFacet
Bases:
RedactMixin
- property skip_redact: list[str]
- class openlineage.client.facet.NominalTimeRunFacet(nominalStartTime, nominalEndTime=None)
Bases:
BaseFacet
- Parameters:
nominalStartTime (
str
)nominalEndTime (
Optional
[str
])
-
nominalStartTime:
str
-
nominalEndTime:
Optional
[str
]
- class openlineage.client.facet.ParentRunFacet(run, job)
Bases:
BaseFacet
- Parameters:
run (
dict
[Any
,Any
])job (
dict
[Any
,Any
])
-
run:
dict
[Any
,Any
]
-
job:
dict
[Any
,Any
]
- classmethod create(runId, namespace, name)
- Parameters:
runId (
str
)namespace (
str
)name (
str
)
- Return type:
- class openlineage.client.facet.DocumentationJobFacet(description)
Bases:
BaseFacet
- Parameters:
description (
str
)
-
description:
str
- class openlineage.client.facet.SourceCodeLocationJobFacet(type, url)
Bases:
BaseFacet
- Parameters:
type (
str
)url (
str
)
-
type:
str
-
url:
str
- class openlineage.client.facet.SqlJobFacet(query)
Bases:
BaseFacet
- Parameters:
query (
str
)
-
query:
str
- class openlineage.client.facet.DocumentationDatasetFacet(description)
Bases:
BaseFacet
- Parameters:
description (
str
)
-
description:
str
- class openlineage.client.facet.SchemaField(name, type, description=None)
Bases:
RedactMixin
- Parameters:
name (
str
)type (
str
)description (
Optional
[str
])
-
name:
str
-
type:
str
-
description:
Optional
[str
]
- class openlineage.client.facet.SchemaDatasetFacet(fields)
Bases:
BaseFacet
- Parameters:
fields (
list
[SchemaField
])
-
fields:
list
[SchemaField
]
- class openlineage.client.facet.DataSourceDatasetFacet(name, uri)
Bases:
BaseFacet
- Parameters:
name (
str
)uri (
str
)
-
name:
str
-
uri:
str
- class openlineage.client.facet.OutputStatisticsOutputDatasetFacet(rowCount=None, size=None, fileCount=None)
Bases:
BaseFacet
- Parameters:
rowCount (
Optional
[int
])size (
Optional
[int
])fileCount (
Optional
[int
])
-
rowCount:
Optional
[int
]
-
size:
Optional
[int
]
-
fileCount:
Optional
[int
]
- class openlineage.client.facet.ColumnMetric(nullCount=None, distinctCount=None, sum=None, count=None, min=None, max=None, quantiles=None)
Bases:
object
- Parameters:
nullCount (
Optional
[int
])distinctCount (
Optional
[int
])sum (
Optional
[int
])count (
Optional
[int
])min (
Optional
[float
])max (
Optional
[float
])quantiles (
Optional
[dict
[str
,float
]])
-
nullCount:
Optional
[int
]
-
distinctCount:
Optional
[int
]
-
sum:
Optional
[int
]
-
count:
Optional
[int
]
-
min:
Optional
[float
]
-
max:
Optional
[float
]
-
quantiles:
Optional
[dict
[str
,float
]]
- class openlineage.client.facet.DataQualityMetricsInputDatasetFacet(rowCount=None, bytes=None, fileCount=None, columnMetrics=NOTHING)
Bases:
BaseFacet
- Parameters:
rowCount (
Optional
[int
])bytes (
Optional
[int
])fileCount (
Optional
[int
])columnMetrics (
dict
[str
,ColumnMetric
])
-
rowCount:
Optional
[int
]
-
bytes:
Optional
[int
]
-
fileCount:
Optional
[int
]
-
columnMetrics:
dict
[str
,ColumnMetric
]
- class openlineage.client.facet.Assertion(assertion, success, column=None)
Bases:
RedactMixin
- Parameters:
assertion (
str
)success (
bool
)column (
Optional
[str
])
-
assertion:
str
-
success:
bool
-
column:
Optional
[str
]
- class openlineage.client.facet.DataQualityAssertionsDatasetFacet(assertions)
Bases:
BaseFacet
This facet represents asserted expectations on dataset or it’s column.
- Parameters:
assertions (
list
[Assertion
])
-
assertions:
list
[Assertion
]
- class openlineage.client.facet.SourceCodeJobFacet(language, source)
Bases:
BaseFacet
This facet represents source code that the job executed.
- Parameters:
language (
str
)source (
str
)
-
language:
str
-
source:
str
- class openlineage.client.facet.ExternalQueryRunFacet(externalQueryId, source)
Bases:
BaseFacet
- Parameters:
externalQueryId (
str
)source (
str
)
-
externalQueryId:
str
-
source:
str
- class openlineage.client.facet.ErrorMessageRunFacet(message, programmingLanguage, stackTrace=None)
Bases:
BaseFacet
This facet represents an error message that was the result of a job run.
- Parameters:
message (
str
)programmingLanguage (
str
)stackTrace (
Optional
[str
])
-
message:
str
-
programmingLanguage:
str
-
stackTrace:
Optional
[str
]
- class openlineage.client.facet.SymlinksDatasetFacetIdentifiers(namespace, name, type)
Bases:
object
- Parameters:
namespace (
str
)name (
str
)type (
str
)
-
namespace:
str
-
name:
str
-
type:
str
- class openlineage.client.facet.SymlinksDatasetFacet(identifiers=NOTHING)
Bases:
BaseFacet
This facet represents dataset symlink names.
- Parameters:
identifiers (
list
[SymlinksDatasetFacetIdentifiers
])
-
identifiers:
list
[SymlinksDatasetFacetIdentifiers
]
- class openlineage.client.facet.StorageDatasetFacet(storageLayer, fileFormat)
Bases:
BaseFacet
This facet represents dataset symlink names.
- Parameters:
storageLayer (
str
)fileFormat (
str
)
-
storageLayer:
str
-
fileFormat:
str
- class openlineage.client.facet.OwnershipJobFacetOwners(name, type=None)
Bases:
object
- Parameters:
name (
str
)type (
Optional
[str
])
-
name:
str
-
type:
Optional
[str
]
- class openlineage.client.facet.OwnershipJobFacet(owners=NOTHING)
Bases:
BaseFacet
This facet represents ownership of a job.
- Parameters:
owners (
list
[OwnershipJobFacetOwners
])
-
owners:
list
[OwnershipJobFacetOwners
]
- class openlineage.client.facet.JobTypeJobFacet(processingType, integration, jobType)
Bases:
BaseFacet
This facet represents job type properties.
- Parameters:
processingType (
str
)integration (
str
)jobType (
str
)
-
processingType:
str
-
integration:
str
-
jobType:
str
- class openlineage.client.facet.DatasetVersionDatasetFacet(datasetVersion)
Bases:
BaseFacet
This facet represents version of a dataset.
- Parameters:
datasetVersion (
str
)
-
datasetVersion:
str
- class openlineage.client.facet.LifecycleStateChange(value)
Bases:
Enum
An enumeration.
- ALTER = 'ALTER'
- CREATE = 'CREATE'
- DROP = 'DROP'
- OVERWRITE = 'OVERWRITE'
- RENAME = 'RENAME'
- TRUNCATE = 'TRUNCATE'
- class openlineage.client.facet.LifecycleStateChangeDatasetFacetPreviousIdentifier(name, namespace)
Bases:
object
- Parameters:
name (
str
)namespace (
str
)
-
name:
str
-
namespace:
str
- class openlineage.client.facet.LifecycleStateChangeDatasetFacet(lifecycleStateChange, previousIdentifier)
Bases:
BaseFacet
This facet represents information of lifecycle changes of a dataset.
- Parameters:
lifecycleStateChange (
LifecycleStateChange
)previousIdentifier (
LifecycleStateChangeDatasetFacetPreviousIdentifier
)
-
lifecycleStateChange:
LifecycleStateChange
-
previousIdentifier:
LifecycleStateChangeDatasetFacetPreviousIdentifier
- class openlineage.client.facet.OwnershipDatasetFacetOwners(name, type)
Bases:
object
- Parameters:
name (
str
)type (
str
)
-
name:
str
-
type:
str
- class openlineage.client.facet.OwnershipDatasetFacet(owners=NOTHING)
Bases:
BaseFacet
This facet represents ownership of a dataset.
- Parameters:
owners (
list
[OwnershipDatasetFacetOwners
])
-
owners:
list
[OwnershipDatasetFacetOwners
]
- class openlineage.client.facet.ColumnLineageDatasetFacetFieldsAdditionalInputFields(namespace, name, field)
Bases:
RedactMixin
- Parameters:
namespace (
str
)name (
str
)field (
str
)
-
namespace:
str
-
name:
str
-
field:
str
- class openlineage.client.facet.ColumnLineageDatasetFacetFieldsAdditional(inputFields, transformationDescription, transformationType)
Bases:
object
- Parameters:
inputFields (
ClassVar
[list
[ColumnLineageDatasetFacetFieldsAdditionalInputFields
]])transformationDescription (
str
)transformationType (
str
)
-
inputFields:
ClassVar
[list
[ColumnLineageDatasetFacetFieldsAdditionalInputFields
]]
-
transformationDescription:
str
-
transformationType:
str
- class openlineage.client.facet.ColumnLineageDatasetFacet(fields=NOTHING)
Bases:
BaseFacet
This facet contains column lineage of a dataset.
- Parameters:
fields (
dict
[str
,ColumnLineageDatasetFacetFieldsAdditional
])
-
fields:
dict
[str
,ColumnLineageDatasetFacetFieldsAdditional
]
- class openlineage.client.facet.ProcessingEngineRunFacet(version, name, openlineageAdapterVersion)
Bases:
BaseFacet
- Parameters:
version (
str
)name (
str
)openlineageAdapterVersion (
str
)
-
version:
str
-
name:
str
-
openlineageAdapterVersion:
str
- class openlineage.client.facet.ExtractionError(errorMessage, stackTrace, task, taskNumber)
Bases:
BaseFacet
- Parameters:
errorMessage (
str
)stackTrace (
Optional
[str
])task (
Optional
[str
])taskNumber (
Optional
[int
])
-
errorMessage:
str
-
stackTrace:
Optional
[str
]
-
task:
Optional
[str
]
-
taskNumber:
Optional
[int
]
- class openlineage.client.facet.ExtractionErrorRunFacet(totalTasks, failedTasks, errors)
Bases:
BaseFacet
- Parameters:
totalTasks (
int
)failedTasks (
int
)errors (
list
[ExtractionError
])
-
totalTasks:
int
-
failedTasks:
int
-
errors:
list
[ExtractionError
]
openlineage.client.facet_v2 module
- class openlineage.client.facet_v2.BaseFacet(*, producer='')
Bases:
RedactMixin
all fields of the base facet are prefixed with _ to avoid name conflicts in facets
- Parameters:
producer (
str
)
- property skip_redact: list[str]
- with_additional_properties(**kwargs)
Add additional properties to updated class instance.
- Parameters:
kwargs (
dict
[str
,Any
])- Return type:
- class openlineage.client.facet_v2.DatasetFacet(*, producer='', deleted=None)
Bases:
BaseFacet
A Dataset Facet
- Parameters:
producer (str)
deleted (bool | None)
- class openlineage.client.facet_v2.InputDatasetFacet(*, producer='')
Bases:
BaseFacet
An Input Dataset Facet
- Parameters:
producer (
str
)
- class openlineage.client.facet_v2.JobFacet(*, producer='', deleted=None)
Bases:
BaseFacet
A Job Facet
- Parameters:
producer (str)
deleted (bool | None)
- class openlineage.client.facet_v2.OutputDatasetFacet(*, producer='')
Bases:
BaseFacet
An Output Dataset Facet
- Parameters:
producer (
str
)
- class openlineage.client.facet_v2.RunFacet(*, producer='')
Bases:
BaseFacet
A Run Facet
- Parameters:
producer (
str
)
- openlineage.client.facet_v2.set_producer(producer)
- Parameters:
producer (
str
)- Return type:
None
openlineage.client.filter module
- class openlineage.client.filter.FilterConfig(type=None, match=None, regex=None)
Bases:
object
- Parameters:
type (str | None)
match (str | None)
regex (str | None)
- type: str | None
- match: str | None
- regex: str | None
- class openlineage.client.filter.Filter
Bases:
object
- filter_event(event)
- Parameters:
event (RunEventType)
- Return type:
RunEventType | None
- class openlineage.client.filter.ExactMatchFilter(match)
Bases:
Filter
- Parameters:
match (
str
)
- filter_event(event)
- Parameters:
event (RunEventType)
- Return type:
RunEventType | None
- class openlineage.client.filter.RegexFilter(regex)
Bases:
Filter
- Parameters:
regex (
str
)
- filter_event(event)
- Parameters:
event (RunEventType)
- Return type:
RunEventType | None
- openlineage.client.filter.create_filter(conf)
- Parameters:
conf (FilterConfig)
- Return type:
Filter | None
openlineage.client.run module
- class openlineage.client.run.RunState(value)
Bases:
Enum
An enumeration.
- START = 'START'
- RUNNING = 'RUNNING'
- COMPLETE = 'COMPLETE'
- ABORT = 'ABORT'
- FAIL = 'FAIL'
- OTHER = 'OTHER'
- class openlineage.client.run.Dataset(namespace, name, facets=NOTHING)
Bases:
RedactMixin
- Parameters:
namespace (
str
)name (
str
)facets (
dict
[Any
,Any
])
-
namespace:
str
-
name:
str
-
facets:
dict
[Any
,Any
]
- class openlineage.client.run.InputDataset(namespace, name, facets=NOTHING, inputFacets=NOTHING)
Bases:
Dataset
- Parameters:
namespace (
str
)name (
str
)facets (
dict
[Any
,Any
])inputFacets (
dict
[Any
,Any
])
-
inputFacets:
dict
[Any
,Any
]
- class openlineage.client.run.OutputDataset(namespace, name, facets=NOTHING, outputFacets=NOTHING)
Bases:
Dataset
- Parameters:
namespace (
str
)name (
str
)facets (
dict
[Any
,Any
])outputFacets (
dict
[Any
,Any
])
-
outputFacets:
dict
[Any
,Any
]
- class openlineage.client.run.DatasetEvent(eventTime, producer, schemaURL, dataset)
Bases:
RedactMixin
- Parameters:
eventTime (
str
)producer (
str
)schemaURL (
str
)dataset (
Dataset
)
-
eventTime:
str
-
producer:
str
-
schemaURL:
str
-
dataset:
Dataset
- class openlineage.client.run.Job(namespace, name, facets=NOTHING)
Bases:
RedactMixin
- Parameters:
namespace (
str
)name (
str
)facets (
dict
[Any
,Any
])
-
namespace:
str
-
name:
str
-
facets:
dict
[Any
,Any
]
- class openlineage.client.run.JobEvent(eventTime, producer, schemaURL, job, inputs=NOTHING, outputs=NOTHING)
Bases:
RedactMixin
- Parameters:
-
eventTime:
str
-
producer:
str
-
schemaURL:
str
-
job:
Job
-
inputs:
Optional
[list
[Dataset
]]
-
outputs:
Optional
[list
[Dataset
]]
- class openlineage.client.run.Run(runId, facets=NOTHING)
Bases:
RedactMixin
- Parameters:
runId (
str
)facets (
dict
[Any
,Any
])
-
runId:
str
-
facets:
dict
[Any
,Any
]
- check(attribute, value)
- Parameters:
attribute (
str
)value (
str
)
- Return type:
None
- class openlineage.client.run.RunEvent(eventType, eventTime, run, job, producer, inputs=NOTHING, outputs=NOTHING, schemaURL='https://openlineage.io/spec/1-0-5/OpenLineage.json#/definitions/RunEvent')
Bases:
RedactMixin
- Parameters:
-
eventType:
RunState
-
eventTime:
str
-
run:
Run
-
job:
Job
-
producer:
str
-
inputs:
Optional
[list
[Dataset
]]
-
outputs:
Optional
[list
[Dataset
]]
-
schemaURL:
str
- check(attribute, value)
- Parameters:
attribute (
str
)value (
str
)
- Return type:
None
openlineage.client.serde module
- class openlineage.client.serde.Serde
Bases:
object
- classmethod remove_nulls_and_enums(obj)
- Parameters:
obj (
Any
)- Return type:
Any
- classmethod to_dict(obj)
- Parameters:
obj (
Any
)- Return type:
dict
[Any
,Any
]
- classmethod to_json(obj)
- Parameters:
obj (
Any
)- Return type:
str
openlineage.client.utils module
- openlineage.client.utils.import_from_string(path)
- Parameters:
path (
str
)- Return type:
type
[Any
]
- openlineage.client.utils.try_import_from_string(path)
- Parameters:
path (str)
- Return type:
type[Any] | None
- openlineage.client.utils.get_only_specified_fields(clazz, params)
- Parameters:
clazz (
type
[Any
])params (
dict
[str
,Any
])
- Return type:
dict
[str
,Any
]
- openlineage.client.utils.deep_merge_dicts(dict1, dict2)
Deep merges two dictionaries.
This function merges two dictionaries while handling nested dictionaries. For keys that exist in both dictionaries, the values from dict2 take precedence. If a key exists in both dictionaries and the values are dictionaries themselves, they are merged recursively. This function merges only dictionaries. If key is of different type, e.g. list it does not work properly.
- Parameters:
dict1 (
dict
[Any
,Any
])dict2 (
dict
[Any
,Any
])
- Return type:
dict
[Any
,Any
]
- class openlineage.client.utils.RedactMixin
Bases:
object
- property skip_redact: list[str]
openlineage.client.uuid module
- openlineage.client.uuid.generate_new_uuid(instant=None)
Generate new UUID for an instant of time. Each function call returns a new UUID value.
UUID version is an implementation detail, and should not be relied on. For now it is [UUIDv7](https://datatracker.ietf.org/doc/rfc9562/), so for increasing instant values, returned UUID is always greater than previous one.
Using uuid6 lib implementation (MIT License), with few changes: * oittaa/uuid6-python * oittaa/uuid6-python
Added in v1.15.0
- Parameters:
instant (datetime | None) – instant of time used to generate UUID. If not provided, current time is used.
- Return type:
UUID
- Returns:
UUID
- openlineage.client.uuid.generate_static_uuid(instant, data)
Generate UUID for instant of time and input data. Calling function with same arguments always produces the same result.
UUID version is an implementation detail, and **should not* be relied on. For now it is [UUIDv7](https://datatracker.ietf.org/doc/rfc9562/), so for increasing instant values, returned UUID is always greater than previous one. The only difference from RFC 9562 is that least significant bytes are not random, but instead a SHA-1 hash of input data.
Using uuid6 lib implementation (MIT License), with few changes: * oittaa/uuid6-python * oittaa/uuid6-python
Added in v1.15.0
- Parameters:
instant (
datetime
) – instant of time used to generate UUID. If not provided, current time is used.data (
bytes
) – input data to generate random part from.
- Return type:
UUID
- Returns:
UUID
openlineage.client.generated.base module
- openlineage.client.generated.base.set_producer(producer)
- Parameters:
producer (
str
)- Return type:
None
- class openlineage.client.generated.base.BaseEvent(*, eventTime, producer='')
Bases:
RedactMixin
- Parameters:
eventTime (
str
)producer (
str
)
-
eventTime:
str
the time the event occurred at
-
producer:
str
-
schemaURL:
str
- property skip_redact: list[str]
- eventtime_check(attribute, value)
- Parameters:
attribute (
str
)value (
str
)
- Return type:
None
- producer_check(attribute, value)
- Parameters:
attribute (
str
)value (
str
)
- Return type:
None
- schemaurl_check(attribute, value)
- Parameters:
attribute (
str
)value (
str
)
- Return type:
None
- class openlineage.client.generated.base.BaseFacet(*, producer='')
Bases:
RedactMixin
all fields of the base facet are prefixed with _ to avoid name conflicts in facets
- Parameters:
producer (
str
)
- property skip_redact: list[str]
- with_additional_properties(**kwargs)
Add additional properties to updated class instance.
- Parameters:
kwargs (
dict
[str
,Any
])- Return type:
- class openlineage.client.generated.base.Dataset(namespace, name, *, facets=NOTHING)
Bases:
RedactMixin
- Parameters:
namespace (str)
name (str)
facets (dict[str, DatasetFacet] | None)
- namespace: str
The namespace containing that dataset
- name: str
The unique name for that dataset within that namespace
- facets: dict[str, DatasetFacet] | None
The facets for this dataset
- class openlineage.client.generated.base.DatasetEvent(*, eventTime, producer='', dataset)
Bases:
BaseEvent
- Parameters:
eventTime (str)
producer (str)
dataset (StaticDataset)
-
dataset:
StaticDataset
- class openlineage.client.generated.base.DatasetFacet(*, producer='', deleted=None)
Bases:
BaseFacet
A Dataset Facet
- Parameters:
producer (str)
deleted (bool | None)
- class openlineage.client.generated.base.EventType(value)
Bases:
Enum
the current transition of the run state. It is required to issue 1 START event and 1 of [ COMPLETE, ABORT, FAIL ] event per run. Additional events with OTHER eventType can be added to the same run. For example to send additional metadata after the run is complete
- START = 'START'
- RUNNING = 'RUNNING'
- COMPLETE = 'COMPLETE'
- ABORT = 'ABORT'
- FAIL = 'FAIL'
- OTHER = 'OTHER'
- class openlineage.client.generated.base.InputDataset(namespace, name, inputFacets=NOTHING, *, facets=NOTHING)
Bases:
Dataset
An input dataset
- Parameters:
namespace (str)
name (str)
inputFacets (dict[str, InputDatasetFacet] | None)
facets (dict[str, DatasetFacet] | None)
- inputFacets: dict[str, InputDatasetFacet] | None
The input facets for this dataset.
- class openlineage.client.generated.base.InputDatasetFacet(*, producer='')
Bases:
BaseFacet
An Input Dataset Facet
- Parameters:
producer (
str
)
- class openlineage.client.generated.base.Job(namespace, name, facets=NOTHING)
Bases:
RedactMixin
- Parameters:
namespace (str)
name (str)
facets (dict[str, JobFacet] | None)
- namespace: str
The namespace containing that job
- name: str
The unique name for that job within that namespace
- facets: dict[str, JobFacet] | None
The job facets.
- class openlineage.client.generated.base.JobEvent(*, eventTime, producer='', job, inputs=NOTHING, outputs=NOTHING)
Bases:
BaseEvent
- Parameters:
eventTime (str)
producer (str)
job (Job)
inputs (list[InputDataset] | None)
outputs (list[OutputDataset] | None)
- job: Job
- inputs: list[InputDataset] | None
The set of input datasets.
- outputs: list[OutputDataset] | None
The set of output datasets.
- class openlineage.client.generated.base.JobFacet(*, producer='', deleted=None)
Bases:
BaseFacet
A Job Facet
- Parameters:
producer (str)
deleted (bool | None)
- class openlineage.client.generated.base.OutputDataset(namespace, name, outputFacets=NOTHING, *, facets=NOTHING)
Bases:
Dataset
An output dataset
- Parameters:
namespace (str)
name (str)
outputFacets (dict[str, OutputDatasetFacet] | None)
facets (dict[str, DatasetFacet] | None)
- outputFacets: dict[str, OutputDatasetFacet] | None
The output facets for this dataset
- class openlineage.client.generated.base.OutputDatasetFacet(*, producer='')
Bases:
BaseFacet
An Output Dataset Facet
- Parameters:
producer (
str
)
- class openlineage.client.generated.base.Run(runId, facets=NOTHING)
Bases:
RedactMixin
- Parameters:
runId (str)
facets (dict[str, RunFacet] | None)
- runId: str
The globally unique ID of the run associated with the job.
- facets: dict[str, RunFacet] | None
The run facets.
- runid_check(attribute, value)
- Parameters:
attribute (
str
)value (
str
)
- Return type:
None
- class openlineage.client.generated.base.RunEvent(*, eventTime, producer='', run, job, eventType=None, inputs=NOTHING, outputs=NOTHING)
Bases:
BaseEvent
- Parameters:
eventTime (str)
producer (str)
run (Run)
job (Job)
eventType (EventType | None)
inputs (list[InputDataset] | None)
outputs (list[OutputDataset] | None)
- run: Run
- job: Job
- eventType: EventType | None
the current transition of the run state. It is required to issue 1 START event and 1 of [ COMPLETE, ABORT, FAIL ] event per run. Additional events with OTHER eventType can be added to the same run. For example to send additional metadata after the run is complete
- inputs: list[InputDataset] | None
The set of input datasets.
- outputs: list[OutputDataset] | None
The set of output datasets.
- class openlineage.client.generated.base.RunFacet(*, producer='')
Bases:
BaseFacet
A Run Facet
- Parameters:
producer (
str
)
- class openlineage.client.generated.base.StaticDataset(namespace, name, *, facets=NOTHING)
Bases:
Dataset
A Dataset sent within static metadata events
- Parameters:
namespace (str)
name (str)
facets (dict[str, DatasetFacet] | None)
openlineage.client.generated.column_lineage_dataset module
- class openlineage.client.generated.column_lineage_dataset.ColumnLineageDatasetFacet(fields, dataset=NOTHING, *, producer='', deleted=None)
Bases:
DatasetFacet
- Parameters:
fields (dict[str, Fields])
dataset (list[InputField] | None)
producer (str)
deleted (bool | None)
- fields: dict[str, Fields]
Column level lineage that maps output fields into input fields used to evaluate them.
- dataset: list[InputField] | None
Column level lineage that affects the whole dataset. This includes filtering, sorting, grouping (aggregates), joining, window functions, etc.
- class openlineage.client.generated.column_lineage_dataset.Fields(inputFields, transformationDescription=None, transformationType=None)
Bases:
RedactMixin
- Parameters:
inputFields (list[InputField])
transformationDescription (str | None)
transformationType (str | None)
- inputFields: list[InputField]
- transformationDescription: str | None
a string representation of the transformation applied
- transformationType: str | None
no original data available (like a hash of PII for example)
- Type:
IDENTITY|MASKED reflects a clearly defined behavior. IDENTITY
- Type:
exact same as input; MASKED
- with_additional_properties(**kwargs)
Add additional properties to updated class instance.
- Parameters:
kwargs (
dict
[str
,Any
])- Return type:
- class openlineage.client.generated.column_lineage_dataset.InputField(namespace, name, field, transformations=NOTHING)
Bases:
RedactMixin
Represents a single dependency on some field (column).
- Parameters:
namespace (str)
name (str)
field (str)
transformations (list[Transformation] | None)
- namespace: str
The input dataset namespace
- name: str
The input dataset name
- field: str
The input field
- transformations: list[Transformation] | None
- with_additional_properties(**kwargs)
Add additional properties to updated class instance.
- Parameters:
kwargs (
dict
[str
,Any
])- Return type:
- class openlineage.client.generated.column_lineage_dataset.Transformation(type, subtype=None, description=None, masking=None)
Bases:
RedactMixin
- Parameters:
type (str)
subtype (str | None)
description (str | None)
masking (bool | None)
- type: str
DIRECT, INDIRECT
- Type:
The type of the transformation. Allowed values are
- subtype: str | None
The subtype of the transformation
- description: str | None
a string representation of the transformation applied
- masking: bool | None
is transformation masking the data or not
- with_additional_properties(**kwargs)
Add additional properties to updated class instance.
- Parameters:
kwargs (
dict
[str
,Any
])- Return type:
openlineage.client.generated.data_quality_assertions_dataset module
- class openlineage.client.generated.data_quality_assertions_dataset.Assertion(assertion, success, column=None)
Bases:
RedactMixin
- Parameters:
assertion (str)
success (bool)
column (str | None)
- assertion: str
Type of expectation test that dataset is subjected to
- success: bool
- column: str | None
Column that expectation is testing. It should match the name provided in SchemaDatasetFacet. If column field is empty, then expectation refers to whole dataset.
- class openlineage.client.generated.data_quality_assertions_dataset.DataQualityAssertionsDatasetFacet(assertions, *, producer='')
Bases:
InputDatasetFacet
list of tests performed on dataset or dataset columns, and their results
- Parameters:
assertions (
list
[Assertion
])producer (
str
)
-
assertions:
list
[Assertion
]
openlineage.client.generated.data_quality_metrics_input_dataset module
- class openlineage.client.generated.data_quality_metrics_input_dataset.ColumnMetrics(nullCount=None, distinctCount=None, sum=None, count=None, min=None, max=None, quantiles=NOTHING)
Bases:
RedactMixin
- Parameters:
nullCount (int | None)
distinctCount (int | None)
sum (float | None)
count (float | None)
min (float | None)
max (float | None)
quantiles (dict[str, float] | None)
- nullCount: int | None
The number of null values in this column for the rows evaluated
- distinctCount: int | None
The number of distinct values in this column for the rows evaluated
- sum: float | None
The total sum of values in this column for the rows evaluated
- count: float | None
The number of values in this column
- min: float | None
- max: float | None
- quantiles: dict[str, float] | None
0.1 0.25 0.5 0.75 1
- Type:
The property key is the quantile. Examples
- class openlineage.client.generated.data_quality_metrics_input_dataset.DataQualityMetricsInputDatasetFacet(columnMetrics, rowCount=None, bytes=None, fileCount=None, *, producer='')
Bases:
InputDatasetFacet
- Parameters:
columnMetrics (dict[str, ColumnMetrics])
rowCount (int | None)
bytes (int | None)
fileCount (int | None)
producer (str)
- columnMetrics: dict[str, ColumnMetrics]
The property key is the column name
- rowCount: int | None
The number of rows evaluated
- bytes: int | None
The size in bytes
- fileCount: int | None
The number of files evaluated
openlineage.client.generated.dataset_version_dataset module
- class openlineage.client.generated.dataset_version_dataset.DatasetVersionDatasetFacet(datasetVersion, *, producer='', deleted=None)
Bases:
DatasetFacet
- Parameters:
datasetVersion (str)
producer (str)
deleted (bool | None)
- datasetVersion: str
The version of the dataset.
openlineage.client.generated.datasource_dataset module
- class openlineage.client.generated.datasource_dataset.DatasourceDatasetFacet(name=None, uri=None, *, producer='', deleted=None)
Bases:
DatasetFacet
- Parameters:
name (str | None)
uri (str | None)
producer (str)
deleted (bool | None)
- name: str | None
- uri: str | None
- uri_check(attribute, value)
- Parameters:
attribute (
str
)value (
str
)
- Return type:
None
openlineage.client.generated.documentation_dataset module
- class openlineage.client.generated.documentation_dataset.DocumentationDatasetFacet(description, contentType=None, *, producer='', deleted=None)
Bases:
DatasetFacet
- Parameters:
description (str)
contentType (str | None)
producer (str)
deleted (bool | None)
- description: str
The description of the dataset.
- contentType: str | None
MIME type of the description field content.
openlineage.client.generated.documentation_job module
- class openlineage.client.generated.documentation_job.DocumentationJobFacet(description, contentType=None, *, producer='', deleted=None)
Bases:
JobFacet
- Parameters:
description (str)
contentType (str | None)
producer (str)
deleted (bool | None)
- description: str
The description of the job.
- contentType: str | None
MIME type of the description field content.
openlineage.client.generated.error_message_run module
- class openlineage.client.generated.error_message_run.ErrorMessageRunFacet(message, programmingLanguage, stackTrace=None, *, producer='')
Bases:
RunFacet
- Parameters:
message (str)
programmingLanguage (str)
stackTrace (str | None)
producer (str)
- message: str
A human-readable string representing error message generated by observed system
- programmingLanguage: str
Programming language the observed system uses.
- stackTrace: str | None
A language-specific stack trace generated by observed system
openlineage.client.generated.external_query_run module
- class openlineage.client.generated.external_query_run.ExternalQueryRunFacet(externalQueryId, source, *, producer='')
Bases:
RunFacet
- Parameters:
externalQueryId (
str
)source (
str
)producer (
str
)
- externalQueryId: str
Identifier for the external system
- source: str
source of the external query
openlineage.client.generated.extraction_error_run module
- class openlineage.client.generated.extraction_error_run.Error(errorMessage, stackTrace=None, task=None, taskNumber=None)
Bases:
RedactMixin
- Parameters:
errorMessage (str)
stackTrace (str | None)
task (str | None)
taskNumber (int | None)
- errorMessage: str
Text representation of extraction error message.
- stackTrace: str | None
Stack trace of extraction error message
- task: str | None
Text representation of task that failed. This can be, for example, SQL statement that parser could not interpret.
- taskNumber: int | None
Order of task (counted from 0).
- class openlineage.client.generated.extraction_error_run.ExtractionErrorRunFacet(totalTasks, failedTasks, errors, *, producer='')
Bases:
RunFacet
- Parameters:
totalTasks (
int
)failedTasks (
int
)errors (
list
[Error
])producer (
str
)
- totalTasks: int
The number of distinguishable tasks in a run that were processed by OpenLineage, whether successfully or not. Those could be, for example, distinct SQL statements.
- failedTasks: int
The number of distinguishable tasks in a run that were processed not successfully by OpenLineage. Those could be, for example, distinct SQL statements.
- errors: list[Error]
openlineage.client.generated.job_type_job module
- class openlineage.client.generated.job_type_job.JobTypeJobFacet(processingType, integration, jobType=None, *, producer='', deleted=None)
Bases:
JobFacet
- Parameters:
processingType (str)
integration (str)
jobType (str | None)
producer (str)
deleted (bool | None)
- processingType: str
BATCH or STREAMING
- Type:
Job processing type like
- integration: str
for example SPARK|DBT|AIRFLOW|FLINK
- Type:
OpenLineage integration type of this job
- jobType: str | None
QUERY|COMMAND|DAG|TASK|JOB|MODEL. This is an integration-specific field.
- Type:
Run type, for example
openlineage.client.generated.lifecycle_state_change_dataset module
- class openlineage.client.generated.lifecycle_state_change_dataset.LifecycleStateChange(value)
Bases:
Enum
The lifecycle state change.
- ALTER = 'ALTER'
- CREATE = 'CREATE'
- DROP = 'DROP'
- OVERWRITE = 'OVERWRITE'
- RENAME = 'RENAME'
- TRUNCATE = 'TRUNCATE'
- class openlineage.client.generated.lifecycle_state_change_dataset.LifecycleStateChangeDatasetFacet(lifecycleStateChange, previousIdentifier=None, *, producer='', deleted=None)
Bases:
DatasetFacet
- Parameters:
lifecycleStateChange (LifecycleStateChange)
previousIdentifier (PreviousIdentifier | None)
producer (str)
deleted (bool | None)
- lifecycleStateChange: LifecycleStateChange
The lifecycle state change.
- previousIdentifier: PreviousIdentifier | None
Previous name of the dataset in case of renaming it.
- class openlineage.client.generated.lifecycle_state_change_dataset.PreviousIdentifier(name, namespace)
Bases:
RedactMixin
Previous name of the dataset in case of renaming it.
- Parameters:
name (
str
)namespace (
str
)
- name: str
- namespace: str
openlineage.client.generated.nominal_time_run module
- class openlineage.client.generated.nominal_time_run.NominalTimeRunFacet(nominalStartTime, nominalEndTime=None, *, producer='')
Bases:
RunFacet
- Parameters:
nominalStartTime (str)
nominalEndTime (str | None)
producer (str)
- nominalStartTime: str
//en.wikipedia.org/wiki/ISO_8601) timestamp representing the nominal start time (included) of the run. AKA the schedule time
- Type:
An [ISO-8601](https
- nominalEndTime: str | None
//en.wikipedia.org/wiki/ISO_8601) timestamp representing the nominal end time (excluded) of the run. (Should be the nominal start time of the next run)
- Type:
An [ISO-8601](https
- nominalstarttime_check(attribute, value)
- Parameters:
attribute (
str
)value (
str
)
- Return type:
None
- nominalendtime_check(attribute, value)
- Parameters:
attribute (
str
)value (
str
)
- Return type:
None
openlineage.client.generated.output_statistics_output_dataset module
- class openlineage.client.generated.output_statistics_output_dataset.OutputStatisticsOutputDatasetFacet(rowCount=None, size=None, fileCount=None, *, producer='')
Bases:
OutputDatasetFacet
- Parameters:
rowCount (int | None)
size (int | None)
fileCount (int | None)
producer (str)
- rowCount: int | None
The number of rows written to the dataset
- size: int | None
The size in bytes written to the dataset
- fileCount: int | None
The number of files written to the dataset
openlineage.client.generated.ownership_dataset module
- class openlineage.client.generated.ownership_dataset.Owner(name, type=None)
Bases:
RedactMixin
- Parameters:
name (str)
type (str | None)
- name: str
the identifier of the owner of the Dataset. It is recommended to define this as a URN. For example application:foo, user:jdoe, team:data
- type: str | None
The type of ownership (optional)
- class openlineage.client.generated.ownership_dataset.OwnershipDatasetFacet(owners=NOTHING, *, producer='', deleted=None)
Bases:
DatasetFacet
- Parameters:
owners (list[Owner] | None)
producer (str)
deleted (bool | None)
- owners: list[Owner] | None
The owners of the dataset.
openlineage.client.generated.ownership_job module
- class openlineage.client.generated.ownership_job.Owner(name, type=None)
Bases:
RedactMixin
- Parameters:
name (str)
type (str | None)
- name: str
the identifier of the owner of the Job. It is recommended to define this as a URN. For example application:foo, user:jdoe, team:data
- type: str | None
The type of ownership (optional)
- class openlineage.client.generated.ownership_job.OwnershipJobFacet(owners=NOTHING, *, producer='', deleted=None)
Bases:
JobFacet
- Parameters:
owners (list[Owner] | None)
producer (str)
deleted (bool | None)
- owners: list[Owner] | None
The owners of the job.
openlineage.client.generated.parent_run module
- class openlineage.client.generated.parent_run.Job(namespace, name)
Bases:
RedactMixin
- Parameters:
namespace (
str
)name (
str
)
-
namespace:
str
The namespace containing that job
-
name:
str
The unique name for that job within that namespace
- class openlineage.client.generated.parent_run.ParentRunFacet(run, job, root=None, *, producer='')
Bases:
RunFacet
the id of the parent run and job, iff this run was spawn from an other run (for example, the Dag run scheduling its tasks)
- Parameters:
run (Run)
job (Job)
root (Root | None)
producer (str)
- run: Run
- job: Job
- root: Root | None
- classmethod create(runId, namespace, name)
- Parameters:
runId (
str
)namespace (
str
)name (
str
)
- Return type:
- class openlineage.client.generated.parent_run.Root(run, job)
Bases:
RedactMixin
- Parameters:
run (RootRun)
job (RootJob)
-
run:
RootRun
-
job:
RootJob
- class openlineage.client.generated.parent_run.RootJob(namespace, name)
Bases:
RedactMixin
- Parameters:
namespace (
str
)name (
str
)
-
namespace:
str
The namespace containing root job
-
name:
str
The unique name containing root job within that namespace
- class openlineage.client.generated.parent_run.RootRun(runId)
Bases:
RedactMixin
- Parameters:
runId (
str
)
-
runId:
str
The globally unique ID of the root run associated with the root job.
- runid_check(attribute, value)
- Parameters:
attribute (
str
)value (
str
)
- Return type:
None
- class openlineage.client.generated.parent_run.Run(runId)
Bases:
RedactMixin
- Parameters:
runId (
str
)
-
runId:
str
The globally unique ID of the run associated with the job.
- runid_check(attribute, value)
- Parameters:
attribute (
str
)value (
str
)
- Return type:
None
openlineage.client.generated.processing_engine_run module
- class openlineage.client.generated.processing_engine_run.ProcessingEngineRunFacet(version, name=None, openlineageAdapterVersion=None, *, producer='')
Bases:
RunFacet
- Parameters:
version (str)
name (str | None)
openlineageAdapterVersion (str | None)
producer (str)
- version: str
Processing engine version. Might be Airflow or Spark version.
- name: str | None
Processing engine name, e.g. Airflow or Spark
- openlineageAdapterVersion: str | None
OpenLineage adapter package version. Might be e.g. OpenLineage Airflow integration package version
openlineage.client.generated.schema_dataset module
- class openlineage.client.generated.schema_dataset.SchemaDatasetFacet(fields=NOTHING, *, producer='', deleted=None)
Bases:
DatasetFacet
- Parameters:
fields (list[SchemaDatasetFacetFields] | None)
producer (str)
deleted (bool | None)
- fields: list[SchemaDatasetFacetFields] | None
The fields of the data source.
- class openlineage.client.generated.schema_dataset.SchemaDatasetFacetFields(name, type=None, description=None, fields=NOTHING)
Bases:
RedactMixin
- Parameters:
name (str)
type (str | None)
description (str | None)
fields (list[SchemaDatasetFacetFields] | None)
- name: str
The name of the field.
- type: str | None
The type of the field.
- description: str | None
The description of the field.
- fields: list[SchemaDatasetFacetFields] | None
Nested struct fields.
openlineage.client.generated.source_code_job module
- class openlineage.client.generated.source_code_job.SourceCodeJobFacet(language, sourceCode, *, producer='', deleted=None)
Bases:
JobFacet
- Parameters:
language (str)
sourceCode (str)
producer (str)
deleted (bool | None)
- language: str
Language in which source code of this job was written.
- sourceCode: str
Source code of this job.
openlineage.client.generated.source_code_location_job module
- class openlineage.client.generated.source_code_location_job.SourceCodeLocationJobFacet(type, url, repoUrl=None, path=None, version=None, tag=None, branch=None, *, producer='', deleted=None)
Bases:
JobFacet
- Parameters:
type (str)
url (str)
repoUrl (str | None)
path (str | None)
version (str | None)
tag (str | None)
branch (str | None)
producer (str)
deleted (bool | None)
- type: str
the source control system
- url: str
the full http URL to locate the file
- repoUrl: str | None
the URL to the repository
- path: str | None
the path in the repo containing the source files
- version: str | None
the current version deployed (not a branch name, the actual unique version)
- tag: str | None
optional tag name
- branch: str | None
optional branch name
- url_check(attribute, value)
- Parameters:
attribute (
str
)value (
str
)
- Return type:
None
openlineage.client.generated.sql_job module
- class openlineage.client.generated.sql_job.SQLJobFacet(query, dialect=None, *, producer='', deleted=None)
Bases:
JobFacet
- Parameters:
query (str)
dialect (str | None)
producer (str)
deleted (bool | None)
- query: str
- dialect: str | None
openlineage.client.generated.storage_dataset module
- class openlineage.client.generated.storage_dataset.StorageDatasetFacet(storageLayer, fileFormat=None, *, producer='', deleted=None)
Bases:
DatasetFacet
- Parameters:
storageLayer (str)
fileFormat (str | None)
producer (str)
deleted (bool | None)
- storageLayer: str
iceberg, delta.
- Type:
Storage layer provider with allowed values
- fileFormat: str | None
parquet, orc, avro, json, csv, text, xml.
- Type:
File format with allowed values
openlineage.client.generated.symlinks_dataset module
- class openlineage.client.generated.symlinks_dataset.Identifier(namespace, name, type)
Bases:
RedactMixin
- Parameters:
namespace (
str
)name (
str
)type (
str
)
- namespace: str
The dataset namespace
- name: str
The dataset name
- type: str
Identifier type
- class openlineage.client.generated.symlinks_dataset.SymlinksDatasetFacet(identifiers=NOTHING, *, producer='', deleted=None)
Bases:
DatasetFacet
- Parameters:
identifiers (list[Identifier] | None)
producer (str)
deleted (bool | None)
- identifiers: list[Identifier] | None
openlineage.client.transport.composite module
- class openlineage.client.transport.composite.CompositeConfig(transports, continue_on_failure=True, continue_on_success=True, sort_transports=False)
Bases:
Config
CompositeConfig is a configuration class for CompositeTransport.
- transports
A list or dict of dictionaries, where each dictionary represents the configuration for a child transport. Each dictionary should contain the necessary parameters to initialize a specific transport instance. If dict of dictionaries is passed, keys of the dict will be treated as transport names.
- continue_on_failure
If set to True, the CompositeTransport will attempt to emit the event using all configured transports, regardless of whether any previous transport in the list failed to emit the event. If none of the transports successfully emit the event, an error will still be raised at the end to indicate that no events were emitted. If set to False, an error in transport will halt the emission process for subsequent transports.
- continue_on_success
If True, the CompositeTransport will continue emitting events to all transports even after a successful delivery. If False, it will stop emitting to other transports as soon as the first successful delivery occurs.
- sort_transports
If True, transports will be sorted before emission by priority field. If False, order of transports will not be changed.
- Continue behavior summary:
- continue_on_failure=True, continue_on_success=True:
Always emits to all transports, regardless of successes or failures. Events are delivered everywhere, with no early termination.
- continue_on_failure=True, continue_on_success=False:
Stops on the first successful emission. Failures are ignored and transports continue to be tried until a success is found.
- continue_on_failure=False, continue_on_success=False:
Stops on the first success or the first failure, whichever occurs first.
- continue_on_failure=False, continue_on_success=True:
Stops immediately on the first failure (fail-fast). However, if no failures occur, events will be emitted to all transports, effectively behaving like a “fail-fast all” strategy. This behavior may not align with typical expectations when an early stop after a success is desired; review carefully to ensure it matches the intended delivery semantics.
- Parameters:
transports (list[dict[str, Any]] | dict[str, dict[str, Any]])
continue_on_failure (bool)
continue_on_success (bool)
sort_transports (bool)
- transports: list[dict[str, Any]] | dict[str, dict[str, Any]]
- continue_on_failure: bool
- continue_on_success: bool
- sort_transports: bool
- classmethod from_dict(params)
Create a CompositeConfig object from a dictionary.
- Parameters:
params (
dict
[str
,Any
])- Return type:
- class openlineage.client.transport.composite.CompositeTransport(config)
Bases:
Transport
CompositeTransport is a transport class that emits events using multiple transports.
- Parameters:
config (
CompositeConfig
)
- kind: str | None = 'composite'
- config_class
alias of
CompositeConfig
- property transports: list[Transport]
Create and return a list of transports based on the config.
- emit(event)
Emit an event using all transports in the config.
- Parameters:
event (Event)
- Return type:
None
- close(timeout=-1)
Closes the transport, waiting for all events to complete until the timeout is reached.
- Params:
timeout: Timeout in seconds. Negative value will block until last event is processed, while 0 means it completes immediately.
- Returns:
- True if all events were processed before transport was closed,
False if some events were not processed.
- Return type:
bool
- Parameters:
timeout (
float
)
openlineage.client.transport.console module
- class openlineage.client.transport.console.ConsoleConfig
Bases:
Config
- class openlineage.client.transport.console.ConsoleTransport(config)
Bases:
Transport
- Parameters:
config (
ConsoleConfig
)
- kind: str | None = 'console'
- config_class
alias of
ConsoleConfig
- emit(event)
- Parameters:
event (
Union
[RunEvent
,DatasetEvent
,JobEvent
,RunEvent
,DatasetEvent
,JobEvent
])- Return type:
None
openlineage.client.transport.factory module
- class openlineage.client.transport.factory.DefaultTransportFactory
Bases:
TransportFactory
- register_transport(of_type, clazz)
- Parameters:
of_type (str)
clazz (type[Transport] | str)
- Return type:
None
- create(config=None)
Initializes and returns a transport mechanism based on the provided configuration.
If ‘OPENLINEAGE_DISABLED’ is set to ‘true’, a NoopTransport instance is returned, effectively disabling transport. If a configuration dictionary is provided, transport specified by the config is initialized. If no configuration is provided, the function defaults to a console-based transport, logging a warning and printing events to the console.
- Parameters:
config (dict[str, str] | None)
- Return type:
Transport
openlineage.client.transport.file module
- class openlineage.client.transport.file.FileConfig(log_file_path, append=False)
Bases:
Config
- Parameters:
log_file_path (
str
)append (
bool
)
-
log_file_path:
str
-
append:
bool
= False
- classmethod from_dict(params)
- Parameters:
params (
dict
[str
,Any
])- Return type:
- class openlineage.client.transport.file.FileTransport(config)
Bases:
Transport
- Parameters:
config (
FileConfig
)
- kind: str | None = 'file'
- config_class
alias of
FileConfig
- emit(event)
- Parameters:
event (
Union
[RunEvent
,DatasetEvent
,JobEvent
,RunEvent
,DatasetEvent
,JobEvent
])- Return type:
None
openlineage.client.transport.http module
- class openlineage.client.transport.http.TokenProvider(config)
Bases:
object
- Parameters:
config (
dict
[str
,str
])
- get_bearer()
- Return type:
str | None
- class openlineage.client.transport.http.HttpCompression(value)
Bases:
Enum
An enumeration.
- GZIP = 'gzip'
- class openlineage.client.transport.http.ApiKeyTokenProvider(config)
Bases:
TokenProvider
- Parameters:
config (
dict
[str
,str
])
- get_bearer()
- Return type:
str | None
- openlineage.client.transport.http.create_token_provider(auth)
- Parameters:
auth (
dict
[str
,str
])- Return type:
- openlineage.client.transport.http.get_session()
- Return type:
Session
- class openlineage.client.transport.http.HttpConfig(url, endpoint='api/v1/lineage', timeout=5.0, verify=True, auth=NOTHING, compression=None, session=None, adapter=None, custom_headers=NOTHING, retry={'allowed_methods': ['HEAD', 'POST'], 'backoff_factor': 0.3, 'connect': 5, 'read': 5, 'status_forcelist': [500, 502, 503, 504], 'total': 5})
Bases:
Config
- Parameters:
url (str)
endpoint (str)
timeout (float)
verify (bool)
auth (TokenProvider)
compression (HttpCompression | None)
session (Session | None)
adapter (HTTPAdapter | None)
custom_headers (dict[str, str])
retry (dict[str, Any])
- url: str
- endpoint: str
- timeout: float
- verify: bool
- auth: TokenProvider
- compression: HttpCompression | None
- session: Session | None
- adapter: HTTPAdapter | None
- custom_headers: dict[str, str]
- retry: dict[str, Any]
- classmethod from_dict(params)
- Parameters:
params (
dict
[str
,Any
])- Return type:
- classmethod from_options(url, options, session)
- Parameters:
url (str)
options (OpenLineageClientOptions)
session (Session | None)
- Return type:
HttpConfig
- class openlineage.client.transport.http.HttpTransport(config)
Bases:
Transport
- Parameters:
config (
HttpConfig
)
- kind: str | None = 'http'
- config_class
alias of
HttpConfig
- emit(event)
- Parameters:
event (
Union
[RunEvent
,DatasetEvent
,JobEvent
,RunEvent
,DatasetEvent
,JobEvent
])- Return type:
Response
- property session: Session
- close(timeout=-1)
Closes the transport, waiting for all events to complete until the timeout is reached.
- Params:
timeout: Timeout in seconds. Negative value will block until last event is processed, while 0 means it completes immediately.
- Returns:
- True if all events were processed before transport was closed,
False if some events were not processed.
- Return type:
bool
- Parameters:
timeout (
float
)
openlineage.client.transport.kafka module
- class openlineage.client.transport.kafka.KafkaConfig(config, topic, messageKey=None, flush=True)
Bases:
Config
- Parameters:
config (dict[str, str])
topic (str)
messageKey (str | None)
flush (bool)
- config: dict[str, str]
- topic: str
- messageKey: str | None
- flush: bool
- classmethod from_dict(params)
- Parameters:
params (
dict
[str
,Any
])- Return type:
TypeVar
(_T
, bound= KafkaConfig)
- openlineage.client.transport.kafka.on_delivery(err, msg)
- Parameters:
err (KafkaError)
msg (Message)
- Return type:
None
- class openlineage.client.transport.kafka.KafkaTransport(config)
Bases:
Transport
- Parameters:
config (
KafkaConfig
)
- kind: str | None = 'kafka'
- config_class
alias of
KafkaConfig
- emit(event)
- Parameters:
event (Event)
- Return type:
None
- wait_for_completion(timeout=-1)
Block until all events are processed or timeout is reached.
- Params:
timeout: Timeout in seconds. -1 means to block until last event is processed, 0 means no timeout.
- Returns:
True if all events were processed, False if some events were not processed.
- Return type:
bool
- Parameters:
timeout (
float
)
- close(timeout=-1)
Closes the transport, waiting for all events to complete until the timeout is reached.
- Params:
timeout: Timeout in seconds. Negative value will block until last event is processed, while 0 means it completes immediately.
- Returns:
- True if all events were processed before transport was closed,
False if some events were not processed.
- Return type:
bool
- Parameters:
timeout (
float
)
openlineage.client.transport.msk_iam module
- class openlineage.client.transport.msk_iam.MSKIAMConfig(config, topic, messageKey=None, flush=True, region=None, aws_profile=None, role_arn=None, aws_debug_creds=False)
Bases:
KafkaConfig
- Parameters:
config (dict[str, str])
topic (str)
messageKey (str | None)
flush (bool)
region (str | None)
aws_profile (None | str)
role_arn (None | str)
aws_debug_creds (bool)
- region: str | None
- aws_profile: None | str
- role_arn: None | str
- aws_debug_creds: bool
- class openlineage.client.transport.msk_iam.MSKIAMTransport(config)
Bases:
KafkaTransport
- Parameters:
config (
MSKIAMConfig
)
- kind: str | None = 'msk-iam'
- config_class
alias of
MSKIAMConfig
openlineage.client.transport.noop module
- class openlineage.client.transport.noop.NoopConfig
Bases:
Config
- class openlineage.client.transport.noop.NoopTransport(config)
Bases:
Transport
- Parameters:
config (
NoopConfig
)
- kind: str | None = 'noop'
- config_class
alias of
NoopConfig
- emit(event)
- Parameters:
event (
Union
[RunEvent
,DatasetEvent
,JobEvent
,RunEvent
,DatasetEvent
,JobEvent
])- Return type:
None
openlineage.client.transport.transport module
Transport interface for OpenLineage events.
To implement a custom Transport, implement both Config and Transport classes.
- Transport implementation requirements:
Specify class variable config_class that points to the Config class that Transport requires
Implement __init__ that accepts the specified Config class instance
Implement emit method that accepts OpenLineage events
- Config implementation requirements:
Implement from_dict classmethod to create config from dictionary parameters
The config class can have complex attributes, but must be able to instantiate them in from_dict
- Transport instantiation:
TransportFactory instantiates custom transports by looking at the type field in the config
The factory uses this type to determine which transport class to instantiate
- class openlineage.client.transport.transport.Config
Bases:
object
- classmethod from_dict(params)
- Parameters:
params (
dict
[str
,Any
])- Return type:
TypeVar
(_T
, bound= Config)
- class openlineage.client.transport.transport.Transport
Bases:
object
- kind: str | None = None
- name: str | None = None
- priority: int = 0
- config_class
alias of
Config
- emit(event)
- Parameters:
event (Event)
- Return type:
Any
- close(timeout=-1)
Closes the transport, waiting for all events to complete until the timeout is reached.
- Params:
timeout: Timeout in seconds. Negative value will block until last event is processed, while 0 means it completes immediately.
- Returns:
- True if all events were processed before transport was closed,
False if some events were not processed.
- Return type:
bool
- Parameters:
timeout (
float
)
- class openlineage.client.transport.transport.TransportFactory
Bases:
object
- create(config=None)
- Parameters:
config (dict[str, str] | None)
- Return type:
Transport