Skip to main content

Java SDK for Lineage

The Java SDK provides a fluent API for managing data lineage in Collate. You can query lineage graphs, create edges between entities with column-level mapping, delete edges, export lineage, and run impact analysis. The fluent entry point is the Lineage class in org.openmetadata.sdk.api.
Lineage in Collate is a relationship between any two entities. While the examples below use tables and dashboards, the same API works with any entity type (pipelines, topics, ML models, etc.).

Setup

Before using the fluent API, initialize the client:
import org.openmetadata.sdk.api.Lineage;
import org.openmetadata.sdk.client.OpenMetadataClient;

OpenMetadataClient client = OpenMetadataClient.builder()
    .baseUrl("http://localhost:8585/api")
    .jwtToken("<YOUR-JWT-TOKEN>")
    .build();

Lineage.setDefaultClient(client);

Querying Lineage

Retrieve the lineage graph for an entity by type and ID:
// Get lineage with default depth (1 upstream, 1 downstream)
Lineage.LineageGraph graph = Lineage.of("table", tableId).fetch();

// Get the raw JSON response
String raw = graph.getRaw();

Custom Depth

Control how many hops upstream and downstream are returned:
Lineage.LineageGraph graph = Lineage.of("table", tableId)
    .upstream(3)
    .downstream(2)
    .fetch();
Or set both directions to the same value:
Lineage.LineageGraph graph = Lineage.of("table", tableId)
    .depth(5)
    .fetch();

Include Deleted Entities

Lineage.LineageGraph graph = Lineage.of("table", tableId)
    .includeDeleted(true)
    .fetch();

Adding Lineage

Use Lineage.connect() to create an edge between two entities:
Lineage.LineageEdge edge = Lineage.connect()
    .from("table", sourceTableId)
    .to("dashboard", dashboardId)
    .withDescription("Dashboard uses data from this table")
    .execute();

Column-Level Lineage

Map specific columns from the source entity to columns in the target entity:
Lineage.LineageEdge edge = Lineage.connect()
    .from("table", sourceTableId)
    .fromColumns("customer_id", "order_date")
    .to("table", targetTableId)
    .toColumns("cust_id", "date")
    .withDescription("ETL transformation")
    .execute();
Each target column in toColumns will be mapped to all columns in fromColumns. The resulting payload includes a columnsLineage array inside lineageDetails.

Pipeline Reference

Associate a pipeline entity that powers the transformation:
Lineage.LineageEdge edge = Lineage.connect()
    .from("table", sourceTableId)
    .to("table", targetTableId)
    .withPipeline("pipeline", pipelineId)
    .withDescription("Daily ETL job")
    .execute();

SQL Query

Attach the SQL query driving the lineage:
Lineage.LineageEdge edge = Lineage.connect()
    .from("table", sourceTableId)
    .to("table", targetTableId)
    .withSqlQuery("INSERT INTO target SELECT id, name FROM source")
    .execute();

Full Example

Combine all options:
Lineage.LineageEdge edge = Lineage.connect()
    .from("table", sourceTableId)
    .fromColumns("customer_id", "order_date")
    .to("table", targetTableId)
    .toColumns("cust_id", "date")
    .withPipeline("pipeline", pipelineId)
    .withDescription("ETL transformation")
    .withSqlQuery("SELECT customer_id, order_date FROM source")
    .execute();

Deleting Lineage

Remove a lineage edge between two entities:
Lineage.disconnect()
    .from("table", sourceTableId)
    .to("dashboard", dashboardId)
    .confirm();

Exporting Lineage

Export the lineage graph for a given entity by its fully qualified name:
String csv = Lineage.export()
    .entity("table", "service.database.schema.my_table")
    .upstream(3)
    .downstream(2)
    .execute();

Impact Analysis

Analyze which downstream (or upstream) entities are affected by changes to a given entity:

Downstream Impact

Lineage.ImpactAnalysis impact = Lineage.impact()
    .of("table", tableId)
    .downstream()
    .depth(3)
    .analyze();

int totalImpacted = impact.getTotalImpactCount();

Upstream Impact

Lineage.ImpactAnalysis impact = Lineage.impact()
    .of("dashboard", dashboardId)
    .upstream()
    .depth(5)
    .analyze();

OpenLineage Events

The SDK also supports posting OpenLineage standard events from tools like Spark or Airflow via the OpenLineage fluent API:
import org.openmetadata.sdk.fluent.OpenLineage;

OpenLineage.setDefaultClient(client);

// Post a single run event
String response = OpenLineage.event()
    .withEventType("COMPLETE")
    .withEventTime("2024-01-15T12:00:00Z")
    .withJob("my-etl-job", "my-namespace")
    .withRun("run-id-123")
    .addInput("source_table", "my-namespace")
    .addOutput("target_table", "my-namespace")
    .send();

Batch Events

// Build individual events
OpenLineage.RunEventBuilder event1 = OpenLineage.event()
    .withEventType("START")
    .withEventTime("2024-01-15T12:00:00Z")
    .withJob("job-1", "ns")
    .withRun("run-1");

OpenLineage.RunEventBuilder event2 = OpenLineage.event()
    .withEventType("COMPLETE")
    .withEventTime("2024-01-15T12:05:00Z")
    .withJob("job-1", "ns")
    .withRun("run-1");

// Send as a batch
String response = OpenLineage.batch()
    .addEvent(event1)
    .addEvent(event2)
    .send();

Static API

For simpler use cases, the LineageAPI class in org.openmetadata.sdk.api provides static methods that wrap the underlying HTTP calls:
import org.openmetadata.sdk.api.LineageAPI;

// Get lineage by entity FQN
String lineage = LineageAPI.getLineage("service.db.schema.table");

// Get lineage with depth
String lineage = LineageAPI.getLineage("service.db.schema.table", "3", "2");

// Get lineage by entity type and ID
String lineage = LineageAPI.getEntityLineage("table", tableId);

// Add lineage (pass a Map matching the AddLineage JSON schema)
String result = LineageAPI.addLineage(lineageRequestMap);

// Delete lineage
String result = LineageAPI.deleteLineage("table:fromId", "dashboard:toId");

// Export lineage
String csv = LineageAPI.exportLineage("service.db.schema.table", "table", "3", "2");

Async Variants

All static methods have CompletableFuture-based async variants:
CompletableFuture<String> future = LineageAPI.getLineageAsync("service.db.schema.table");
future.thenAccept(lineage -> System.out.println("Lineage: " + lineage));

CompletableFuture<String> addFuture = LineageAPI.addLineageAsync(request);
CompletableFuture<String> deleteFuture = LineageAPI.deleteLineageAsync("table:id1", "dashboard:id2");