In the world of data lineage, it's essential to have a reliable and flexible way to send events to multiple destinations. This is where the Composite Transport comes in, a powerful feature in OpenLineage that allows you to aggregate multiple transports and send events to multiple targets in a defined order.
What is the Composite Transport?
The Composite Transport is a type of transport in OpenLineage that enables you to send events to multiple destinations sequentially. This is useful when you need to send events to multiple targets, such as a logging system and an API endpoint, one after another in a defined order.
Key Features of the Composite Transport
- Flexibility: The Composite Transport can include a variety of other transport types, such as HTTP, Kafka, and more, allowing you to choose the best transport for each destination.
- Convenience: The Composite Transport allows you to configure multiple transports in a single configuration, making it easy to set up and manage your event emission process.
- Redundancy: With the Composite Transport, you can send events to multiple destinations for redundancy, ensuring that your events are delivered even if one destination fails.
- Different processing: The Composite Transport allows you to send events to different destinations for different types of processing, such as logging and analytics.
How to Use the Composite Transport
To use the Composite Transport, you can configure it in two formats: a list of transport configurations or a map of transport configurations. The list format is useful when you have a fixed set of transports, while the map format is useful when you need to configure transports dynamically.
Here are some examples of how to use the Composite Transport in Python and Java:
- Python Example
- Java Example
- Yaml Config
from openlineage.client import OpenLineageClient
from openlineage.client.transport.composite import CompositeTransport, CompositeConfig
config = CompositeConfig.from_dict({
"type": "composite",
"transports": [
{
"type": "kafka",
"config": {"bootstrap.servers": "localhost:9092"},
"topic": "random-topic",
"messageKey": "key",
"flush": False,
},
{"type": "console"},
],
})
client = OpenLineageClient(transport=CompositeTransport(config))
import java.util.Arrays;
import io.openlineage.client.OpenLineageClient;
import io.openlineage.client.transports.CompositeConfig;
import io.openlineage.client.transports.HttpConfig;
import io.openlineage.client.transports.HttpTransport;
import io.openlineage.client.transports.KafkaConfig;
import io.openlineage.client.transports.KafkaTransport;
HttpConfig httpConfig = new HttpConfig();
httpConfig.setUrl("http://example.com/api");
KafkaConfig kafkaConfig = new KafkaConfig();
KafkaConfig.setTopicName("openlineage.events");
KafkaConfig.setLocalServerId("some-value");
CompositeConfig compositeConfig = new CompositeConfig(Arrays.asList(
new HttpTransport(httpConfig),
new KafkaTransport(kafkaConfig)
), true);
OpenLineageClient client = OpenLineageClient.builder()
.transport(
new CompositeTransport(compositeConfig))
.build();
transport:
type: composite
transports:
first:
type: http
url: http://example.com
auth:
type: api_key
apiKey: random_token
compression: gzip
second:
type: http
url: http://localhost:5050
endpoint: api/v1/lineage
debug:
type: console
Conclusion
The Composite Transport enhances event distribution in OpenLineage by providing flexibility, redundancy, and ease of configuration. Whether you need to send events to multiple targets or ensure reliable delivery in the event of failure, this feature simplifies your workflow. With support for various transport types, it adapts to diverse event-processing needs.
Ready to learn more? Check out the OpenLineage documentation for more information on configuring the CompositeTransport and other advanced features.