Managing a data pipeline means tracking changes. Sometimes changes to your code, sometimes changes to somebody else’s schema, sometimes to the contents of the data itself. Sometimes you need to trace the root cause of a problem- somebody changed an int to a string and all the downstream consumers broke.

Sometimes you want to make a change and see how your consumers were affected- do all the jobs run significantly faster after you filter out unused records? Or did somebody rely on those unused records to be present in the data?

Do the recommendation models perform better after you "improved" the data cleaning job upstream? Can you be certain it was your change that improved the performance?

Sometimes the data itself just looks wrong and you need a way to verify that nothing has broken. Why was there a huge drop in traffic to the food delivery site yesterday? Was there an outage you didn't hear about? Competitors outbidding your ads? Or did the website developers simply stop logging some critical event, corrupting every table in your data warehouse?

Typically, we think of data lineage in static terms-

Job A produces Dataset X, which is consumed by Job B which joins it with Dataset Y and produces Dataset Z, which is consumed by

It’s a map that we use to get our heads around the dependencies that exist between the datasets we use to make good decisions (how much inventory should I stock in the warehouse to ensure customers get timely deliveries?) or to make technical features our customers will love (how can I compile the perfect road trip playlist given this customer's listening history?).

But data lineage is much more than a static map of inputs and outputs. Real time lineage and faceted metadata give us visibility into how the map changes over time and even allow us to look back in history to see how changes in one part of the map cause ripples in other areas. Taking advantage of some recent changes to the Marquez API, we’ll demonstrate how to diagnose job failures and how to explore the impact of code changes on downstream dependents.

Getting Started

To get started, we need a running instance of Marquez with a little bit of seed data. For these exercises, we'll assume you have a terminal with the following programs installed

Download and install any dependencies you don't already have. You'll need the docker daemon running (see the docs for your platform to get that started). Then check out the Marquez repository from Github and run the docker image locally:

git clone https://github.com/MarquezProject/marquez
cd marquez
./docker/up.sh --seed

This script uses docker-compose to spin up a self-contained installation of Marquez, including a local database container, web frontend, and service instance. Additionally, it populates a set of sample data that's useful for exploring the API. You'll know when the seed job is done when you see the following line in the output logs

seed-marquez-with-metadata exited with code 0

Once the seed job is done, we can begin exploring the API.

The Jobs

In a separate terminal window, type the following command

curl "http://localhost:5000/api/v1/namespaces/food_delivery/jobs/" | jq | less

The output returned should look something like the following

{
  "jobs": [
    {
      "id": {
        "namespace": "food_delivery",
        "name": "example.delivery_times_7_days"
      },
      "type": "BATCH",
      "name": "example.delivery_times_7_days",
      "createdAt": "2021-06-24T21:50:39.229759Z",
      "updatedAt": "2021-06-24T22:05:45.321952Z",
      "namespace": "food_delivery",
      "inputs": [
        {
          "namespace": "food_delivery",
          "name": "public.delivery_7_days"
        }
      ],
      "outputs": [],
      "location": "https://github.com/example/jobs/blob/2294bc15eb49071f38425dc927e48655530a2f2e/delivery_times_7_days.py",
      "context": {
        "sql": "INSERT INTO top_delivery_times (order_id, order_placed_on, order_dispatched_on, order_delivered_on, order_delivery_time,\n    customer_email, restaurant_id, driver_id)\n  SELECT order_id, order_placed_on, order_delivered_on, DATEDIFF(minute, order_placed_on, order_delivered_on) AS order_delivery_time,\n    customer_email, restaurant_id, driver_id\n    FROM delivery_7_days\nGROUP BY restaurant_id\nORDER BY order_delivery_time DESC\n   LIMIT 1;"
      },
      "description": "Determine weekly top delivery times by restaurant.",
      "latestRun": {
        "id": "f4fada30-dfcc-400c-9391-2d7a506b9139",
        "createdAt": "2021-06-24T21:50:59.509739Z",
        "updatedAt": "2021-06-24T22:05:45.321952Z",
        "nominalStartTime": "2021-06-24T22:02:00Z",
        "nominalEndTime": "2021-06-24T22:05:00Z",
        "state": "FAILED",
        "startedAt": "2021-06-24T22:02:39.321952Z",
        "endedAt": "2021-06-24T22:05:45.321952Z",
        "durationMs": 186000,
        "args": {},
        "jobVersion": {
          "namespace": "food_delivery",
          "name": "example.delivery_times_7_days",
          "version": "e9eafa5b-e334-358d-a3b4-61c8d3de75f3"
        },
        "inputVersions": [
          {
            "namespace": "food_delivery",
            "name": "public.delivery_7_days",
            "version": "a40ec54f-b8e1-35f7-b868-58b27383b5ff"
          }
        ],
        "outputVersions": [],
        "context": {
          "sql": "INSERT INTO top_delivery_times (order_id, order_placed_on, order_dispatched_on, order_delivered_on, order_delivery_time,\n    customer_email, restaurant_id, driver_id)\n  SELECT order_id, order_placed_on, order_delivered_on, DATEDIFF(minute, order_placed_on, order_delivered_on) AS order_delivery_time,\n    customer_email, restaurant_id, driver_id\n    FROM delivery_7_days\nGROUP BY restaurant_id\nORDER BY order_delivery_time DESC\n   LIMIT 1;"
        },
        "facets": {}
      },
      "facets": {}
    },
   ...
  ]
}

For brevity, I only included a single job- in this case, a job called example.delivery_times_7_days in the food_delivery namespace (which we specified in the curl command). Your output will include many more jobs.

There are a few things in the job output worth noting. The first is the id of the job:

      "id": {
        "namespace": "food_delivery",
        "name": "example.delivery_times_7_days"
      },

There is no version information in the id, as this API refers to the unversioned job information. The job itself is mutable, in the sense that each time you query the API, the content of the job may change as new versions are created.

The response includes the set of input and output datasets, as well as the current job source location:

      "inputs": [
        {
          "namespace": "food_delivery",
          "name": "public.delivery_7_days"
        }
      ],
      "outputs": [],
      "location": "https://github.com/example/jobs/blob/2294bc15eb49071f38425dc927e48655530a2f2e/delivery_times_7_days.py",

If a new version of the job is created, any or all of these fields can change.

The Job Run

The next thing to notice is the latestRun field. This includes information about the latest Run of this job:

      "latestRun": {
        "id": "f4fada30-dfcc-400c-9391-2d7a506b9139",
        "createdAt": "2021-06-24T21:50:59.509739Z",
        "updatedAt": "2021-06-24T22:05:45.321952Z",
        "nominalStartTime": "2021-06-24T22:02:00Z",
        "nominalEndTime": "2021-06-24T22:05:00Z",
        "state": "FAILED",
        "startedAt": "2021-06-24T22:02:39.321952Z",
        "endedAt": "2021-06-24T22:05:45.321952Z",
        "durationMs": 186000,
        "args": {},
        "jobVersion": {
          "namespace": "food_delivery",
          "name": "example.delivery_times_7_days",
          "version": "e9eafa5b-e334-358d-a3b4-61c8d3de75f3"
        },
        "inputVersions": [
          {
            "namespace": "food_delivery",
            "name": "public.delivery_7_days",
            "version": "a40ec54f-b8e1-35f7-b868-58b27383b5ff"
          }
        ],
        "outputVersions": [],
        "context": {
          "sql": "INSERT INTO top_delivery_times (order_id, order_placed_on, order_dispatched_on, order_delivered_on, order_delivery_time,\n    customer_email, restaurant_id, driver_id)\n  SELECT order_id, order_placed_on, order_delivered_on, DATEDIFF(minute, order_placed_on, order_delivered_on) AS order_delivery_time,\n    customer_email, restaurant_id, driver_id\n    FROM delivery_7_days\nGROUP BY restaurant_id\nORDER BY order_delivery_time DESC\n   LIMIT 1;"
        },
        "facets": {}
      },

Here, we see explicit version information in the jobVersion, the inputVersions, and the outputVersions fields. This is included because every Run is tied to exactly one immutable version of a job and one immutable version of each input dataset and each output dataset (it's worth noting that a Run can be tied to one version of a dataset as its input and another version of the same dataset as its output- a SQL MERGE statement is one common use case supported by this).

The other important field to notice in the Run structure is the state

        "state": "FAILED",

Uh-oh. Looks like the last time this job ran, it failed.

Tracing Failures

The first question we have when diagnosing a failure is

Is this the first time it's failed? Or has it been broken a while?

Let's use the API to find out. Checking previous runs is easily accomplished by hitting the job's runs API. Job runs are returned in descending order by start time, so the latest runs should be at the top. Since we only want to check whether (and which) previous runs failed, we can use the following command:

curl "http://localhost:5000/api/v1/namespaces/food_delivery/jobs/example.delivery_times_7_days/runs" | \
  jq '.runs | map({"id": .id, "state": .state})' | less

I get the following output:

[
  {
    "id": "cb436906-1c66-4ce4-b7ac-ceebfd1babf8",
    "state": "FAILED"
  },
  {
    "id": "34bd4d60-82a6-4cac-ad76-815e6d95a93c",
    "state": "COMPLETED"
  },
  {
    "id": "352c67c3-c8d7-4b3a-b7da-8532aa9b8335",
    "state": "COMPLETED"
  },
  {
    "id": "0c62b1cc-2e43-44d0-9443-0a1d9768fece",
    "state": "COMPLETED"
  },
  {
    "id": "5900de19-12f7-4a6e-8118-8e0792d98f65",
    "state": "COMPLETED"
  },
  ...
]

This is an incomplete list of jobs, but it's obvious from this sampling that this is the first job failure in the recent execution history. What we want to see now is what changed between the last successful run and this one. We'll need to grab the id fields of each of the runs we want to compare. The run ids in the seed data are randomly generated, so they'll be different if you're following along. Grab the run ids with the following shell commands:

FAILED_RUN_ID=$(curl "http://localhost:5000/api/v1/namespaces/food_delivery/jobs/example.delivery_times_7_days/runs" | jq -r '.runs[0].id')
SUCCESSFUL_RUN_ID=$(curl "http://localhost:5000/api/v1/namespaces/food_delivery/jobs/example.delivery_times_7_days/runs" | jq -r '.runs[1].id')

To get a specific run, we call the /jobs/runs API. Since each Run ID is required to be unique, the API doesn't require a namespace or a job name. We can get the failed job run with

curl "http://localhost:5000/api/v1/jobs/runs/$FAILED_RUN_ID" | jq | less

The output is the same as the latestRun field of the JobVersions API. Recall the output of that API includes these three important fields: the jobVersion, the inputVersions and the outputVersions.

        "jobVersion": {
          "namespace": "food_delivery",
          "name": "example.delivery_times_7_days",
          "version": "e9eafa5b-e334-358d-a3b4-61c8d3de75f3"
        },
        "inputVersions": [
          {
            "namespace": "food_delivery",
            "name": "public.delivery_7_days",
            "version": "a40ec54f-b8e1-35f7-b868-58b27383b5ff"
          }
        ],
        "outputVersions": [],

These fields give us what we need to trace the lineage of the specific job runs we want to compare.

Job Versions

The first thing to look at is the jobVersion. Nearly 100% of the time, a job failure can be traced to a code change. Let's compare the job version of the failed run with the job version of the successful one:

diff <(curl -s "http://localhost:5000/api/v1/jobs/runs/$FAILED_RUN_ID" | jq -r '.jobVersion.version') \
     <(curl -s "http://localhost:5000/api/v1/jobs/runs/$SUCCESSFUL_RUN_ID" | jq -r '.jobVersion.version')
1c1
< e9eafa5b-e334-358d-a3b4-61c8d3de75f3
---
> 92d801c0-021e-3c3d-ba18-c9e8504b143d

Right away, we see there is a difference. A number of factors contribute to the job versioning logic in Marquez:

  • The source code location
  • The job context
  • The list of input datasets
  • The list of output datasets

The version generation code is a deterministic function of these four inputs, so if any of them change, the version will change. Let's find out what changed between the two job versions. To do the diff, we ought to get rid of anything we expect to differ ahead of time: the version, the createdAt and updatedAt timestamps, and the latestRun. The version field is also nested within the job version's id field, so we'll omit that too.

FAILED_JOB_VERSION=$(curl -s "http://localhost:5000/api/v1/jobs/runs/$FAILED_RUN_ID" | jq -r '.jobVersion.version')
SUCCESSFUL_JOB_VERSION=$(curl -s "http://localhost:5000/api/v1/jobs/runs/$SUCCESSFUL_RUN_ID" | jq -r '.jobVersion.version')

diff <(curl -s "http://localhost:5000/api/v1/namespaces/food_delivery/jobs/example.delivery_times_7_days/versions/$FAILED_JOB_VERSION" | \
      jq 'del(.["id", "version", "createdAt", "updatedAt", "latestRun"])') \
     <(curl -s "http://localhost:5000/api/v1/namespaces/food_delivery/jobs/example.delivery_times_7_days/versions/$SUCCESSFUL_JOB_VERSION" | \
      jq 'del(.["id", "version", "createdAt", "updatedAt", "latestRun"])')
14c14,23
<   "outputs": []
---
>   "outputs": [
>     {
>       "namespace": "food_delivery",
>       "name": "public.top_delivery_times"
>     },
>     {
>       "namespace": "food_delivery",
>       "name": "public.discounts"
>     }
>   ]

Oh, interesting! The two job versions only differ because of the output datasets. This is an interesting point that should be addressed in the Marquez API- the version generation is constructed when the run completes, even if the job run failed. Sometimes this has no impact on the versioning, as the output datasets can be determined before the job run executes. But sometimes we see impacts like this where a job run failed before we had a chance to discover the output datasets.

Tracing Upstream Lineage

So what gives? The job code didn't actually change! So what caused the failure?

Here's where the lineage tracking becomes useful. Recall again, the run output gave us 3 interesting fields: the jobVersion, the inputVersions, and the outputVersions. We already know that the outputVersions is empty because the latest failed run didn't have a chance to determine the outputs. But we can take a look at the input datasets.

Dataset Versions

diff <(curl -s "http://localhost:5000/api/v1/jobs/runs/$FAILED_RUN_ID" | jq -r '.inputVersions') \
     <(curl -s "http://localhost:5000/api/v1/jobs/runs/$SUCCESSFUL_RUN_ID" | jq -r '.inputVersions')
5c5
<     "version": "a40ec54f-b8e1-35f7-b868-58b27383b5ff"
---
>     "version": "5e439f1f-1a44-3700-961f-60c79c75a1ec"

Dataset versions work differently from job versions. They don't only change when the structure changes. Every time a job run modifies or writes to a dataset, the dataset version changes. Unless a job schedule is more frequent than its upstream job's schedule (e.g., an hourly job consuming a daily generated dataset), it is expected that each job run consumes a different version of a dataset. To find out if there is a significant difference, we have to compare the two versions with the dataset's versions API.

We know there's only a single input dataset, so we'll keep this simple, but you could also write a loop to check multiple input datasets if needed.

In this post, we omit the structure of the datasetVersion, but you can explore it yourself with the following:

FAILED_DATASET_VERSION=$(curl -s "http://localhost:5000/api/v1/jobs/runs/$FAILED_RUN_ID" | jq -r '.inputVersions[0].version')
curl -s "http://localhost:5000/api/v1/namespaces/food_delivery/datasets/public.delivery_7_days/versions/$FAILED_DATASET_VERSION" | jq | less

As with the job versions, we'll omit some of the data we expect to be different in order to produce a useful diff:

FAILED_DATASET_VERSION=$(curl -s "http://localhost:5000/api/v1/jobs/runs/$FAILED_RUN_ID" | jq -r '.inputVersions[0].version')
SUCCESSFUL_DATASET_VERSION=$(curl -s "http://localhost:5000/api/v1/jobs/runs/$SUCCESSFUL_RUN_ID" | jq -r '.inputVersions[0].version')

diff <(curl -s "http://localhost:5000/api/v1/namespaces/food_delivery/datasets/public.delivery_7_days/versions/$FAILED_DATASET_VERSION" | \
      jq 'del(.["id", "version", "createdAt", "createdByRun"])') \
     <(curl -s "http://localhost:5000/api/v1/namespaces/food_delivery/datasets/public.delivery_7_days/versions/$SUCCESSFUL_DATASET_VERSION" | \
      jq 'del(.["id", "version", "createdAt", "createdByRun"])')
58c58
<       "type": "VARCHAR",
---
>       "type": "INTEGER",

Hey! Somehow one of the fields was converted from a an INT to a VARCHAR! One of the helpful fields in the version API is the createdByRun, which is similar to the jobVersion's latestRun. It provides the job run that last altered the dataset, creating the new version.

We can quickly compare the job versions of the runs that created these two dataset versions:

diff <(curl -s "http://localhost:5000/api/v1/namespaces/food_delivery/datasets/public.delivery_7_days/versions/$FAILED_DATASET_VERSION" | \
      jq '.createdByRun.jobVersion') \
    <(curl -s "http://localhost:5000/api/v1/namespaces/food_delivery/datasets/public.delivery_7_days/versions/$SUCCESSFUL_DATASET_VERSION" | \
      jq '.createdByRun.jobVersion')
4c4
<   "version": "c222a72e-92cc-3bb6-b3b7-c174cbc76387"
---
>   "version": "76c375bf-58ac-3d19-b94f-424fe2784601"

And we can do a quick comparison of the two job versions. Since the job name is different, we'll let jq generate the endpoints for us

diff <(curl -s $(curl -s "http://localhost:5000/api/v1/namespaces/food_delivery/datasets/public.delivery_7_days/versions/$FAILED_DATASET_VERSION" | \
      jq -r '.createdByRun.jobVersion | "http://localhost:5000/api/v1/namespaces/" + .namespace + "/jobs/" + .name + "/versions/" + .version') | \
      jq 'del(.["id", "version", "createdAt", "updatedAt", "latestRun"])') \
    <(curl -s $(curl -s "http://localhost:5000/api/v1/namespaces/food_delivery/datasets/public.delivery_7_days/versions/$SUCCESSFUL_DATASET_VERSION" | \
      jq -r '.createdByRun.jobVersion | "http://localhost:5000/api/v1/namespaces/" + .namespace + "/jobs/" + .name + "/versions/" + .version') | \
      jq 'del(.["id", "version", "createdAt", "updatedAt", "latestRun"])')
4c4
<   "location": "https://github.com/example/jobs/blob/c87f2a40553cfa4ae7178083a068bf1d0c6ca3a8/etl_delivery_7_days.py",
---
>   "location": "https://github.com/example/jobs/blob/4d0b5d374261fdaf60a1fc588dd8f0d124b0e87f/etl_delivery_7_days.py",

And there it is. Because nearly 100% of the time, a job failure can be traced to a code change. In this example, the job immediately upstream decided to change the output schema of its dataset. In reality, it's not always so straightforward. Sometimes the upstream job is just a passthrough- maybe it applies some filters to a subset of the columns and writes out whatever schema it's given. In that case, the job immediately upstream would have succeeded without a change in the job version. Or the code change in the upstream job could be innocuous. Maybe someone added a comment or fixed an unrelated bug. We might do some follow up and discover we have to continue our search upstream.

But the Marquez API actually gives us that ability. Using the /lineage API, we can even explore the downsteam impact of changes. So if you owned the etl_delivery_7_days job and wanted to see what the impact of changing the varchar to an int was on running jobs, the following jq recursive script will let you walk the downstream jobs and show the state of the last run:

# For readability, the jq filter is in a file broken into multiple lines
cat recurse.jq
  .graph as $graph | .graph[]
  | select(.id == "job:food_delivery:example.etl_delivery_7_days")
  | recurse(.outEdges[] | .destination as $nodeId | $graph[] | select(.id == $nodeId))
  | select(.type == "JOB")
  | {"id": .id, "state": .data.latestRun.state}

curl -s "http://localhost:5000/api/v1-beta/lineage?nodeId=job:food_delivery:example.etl_delivery_7_days" | jq -f recurse.jq less
{
  "id": "job:food_delivery:example.etl_delivery_7_days",
  "state": "COMPLETED"
}
{
  "id": "job:food_delivery:example.delivery_times_7_days",
  "state": "FAILED"
}

In this post, we did everything manually with bash (because the shell is your most powerful tool when debugging a live outage you've never encountered before; and let's be honest- how many outages aren't something you've never encountered before), but this could easily have been done in Java or Go or Python. The openapi spec in the Marquez repo can be used to generate a client in whatever language you want to write your ops tool in. So build some tooling and help your next debugging session run a little more smoothly.

But wait! What about the times when the job isn't failing, but the data is wrong!

Ah, the data quality checks! This is where the extensibility of the OpenLineage model comes to our rescue with a field in the responses that we completely glossed over

      "facets": {}

But I think that's a topic for another post.