Skip to main content
PUT
https://sandbox.getcollate.io/api
/
v1
/
lineage
PUT /v1/lineage
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.type.entityLineage import (
    ColumnLineage,
    EntitiesEdge,
    LineageDetails,
)
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
    OpenMetadataConnection,
)
from metadata.generated.schema.security.client.openMetadataJWTClientConfig import (
    OpenMetadataJWTClientConfig,
)

server_config = OpenMetadataConnection(
    hostPort="https://your-company.getcollate.io/api",
    authProvider="openmetadata",
    securityConfig=OpenMetadataJWTClientConfig(
        jwtToken="<YOUR-JWT-TOKEN>"
    ),
)
metadata = OpenMetadata(server_config)

# Simple lineage between two tables
add_lineage_request = AddLineageRequest(
    edge=EntitiesEdge(
        description="ETL transformation",
        fromEntity=EntityReference(id=source_table.id, type="table"),
        toEntity=EntityReference(id=target_table.id, type="table"),
    ),
)
created_lineage = metadata.add_lineage(data=add_lineage_request)

# With column-level lineage and SQL query
column_lineage = ColumnLineage(
    fromColumns=["service.db.schema.source_table.customer_id"],
    toColumn="service.db.schema.target_table.cust_id",
)

lineage_details = LineageDetails(
    sqlQuery="INSERT INTO target_table SELECT customer_id FROM source_table",
    columnsLineage=[column_lineage],
    pipeline=EntityReference(id=pipeline_entity.id, type="pipeline"),
)

add_lineage_request = AddLineageRequest(
    edge=EntitiesEdge(
        fromEntity=EntityReference(id=source_table.id, type="table"),
        toEntity=EntityReference(id=target_table.id, type="table"),
        lineageDetails=lineage_details,
    ),
)
created_lineage = metadata.add_lineage(data=add_lineage_request)

# Automated SQL lineage (parses SQL to extract lineage)
from metadata.generated.schema.entity.services.databaseService import DatabaseService

database_service = metadata.get_by_name(entity=DatabaseService, fqn="my_service")
metadata.add_lineage_by_query(
    database_service=database_service,
    sql="INSERT INTO target_table(id) SELECT id FROM source_table",
    timeout=200,
)
{
  "entity": {
    "id": "e7bee99b-5c5e-43ec-805c-8beba04804f5",
    "type": "table",
    "name": "source_table",
    "fullyQualifiedName": "sample_data.ecommerce_db.shopify.source_table",
    "deleted": false,
    "href": "http://localhost:8585/api/v1/tables/e7bee99b-5c5e-43ec-805c-8beba04804f5"
  },
  "nodes": [
    {
      "id": "800caa0f-a149-48d2-a0ce-6ca84501767e",
      "type": "table",
      "name": "target_table",
      "fullyQualifiedName": "sample_data.ecommerce_db.shopify.target_table",
      "deleted": false,
      "href": "http://localhost:8585/api/v1/tables/800caa0f-a149-48d2-a0ce-6ca84501767e"
    }
  ],
  "upstreamEdges": [],
  "downstreamEdges": [
    {
      "fromEntity": "e7bee99b-5c5e-43ec-805c-8beba04804f5",
      "toEntity": "800caa0f-a149-48d2-a0ce-6ca84501767e",
      "lineageDetails": {
        "sqlQuery": "INSERT INTO target SELECT id, name FROM source",
        "columnsLineage": [
          {
            "fromColumns": [
              "sample_data.ecommerce_db.shopify.source_table.id"
            ],
            "toColumn": "sample_data.ecommerce_db.shopify.target_table.id"
          }
        ]
      }
    }
  ]
}

Add Lineage

Create a lineage edge between two entities. Supports column-level lineage mappings, pipeline references, and SQL query annotations.

Request Body

edge
object
required
The lineage edge to create.
PUT /v1/lineage
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.type.entityLineage import (
    ColumnLineage,
    EntitiesEdge,
    LineageDetails,
)
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
    OpenMetadataConnection,
)
from metadata.generated.schema.security.client.openMetadataJWTClientConfig import (
    OpenMetadataJWTClientConfig,
)

server_config = OpenMetadataConnection(
    hostPort="https://your-company.getcollate.io/api",
    authProvider="openmetadata",
    securityConfig=OpenMetadataJWTClientConfig(
        jwtToken="<YOUR-JWT-TOKEN>"
    ),
)
metadata = OpenMetadata(server_config)

# Simple lineage between two tables
add_lineage_request = AddLineageRequest(
    edge=EntitiesEdge(
        description="ETL transformation",
        fromEntity=EntityReference(id=source_table.id, type="table"),
        toEntity=EntityReference(id=target_table.id, type="table"),
    ),
)
created_lineage = metadata.add_lineage(data=add_lineage_request)

# With column-level lineage and SQL query
column_lineage = ColumnLineage(
    fromColumns=["service.db.schema.source_table.customer_id"],
    toColumn="service.db.schema.target_table.cust_id",
)

lineage_details = LineageDetails(
    sqlQuery="INSERT INTO target_table SELECT customer_id FROM source_table",
    columnsLineage=[column_lineage],
    pipeline=EntityReference(id=pipeline_entity.id, type="pipeline"),
)

add_lineage_request = AddLineageRequest(
    edge=EntitiesEdge(
        fromEntity=EntityReference(id=source_table.id, type="table"),
        toEntity=EntityReference(id=target_table.id, type="table"),
        lineageDetails=lineage_details,
    ),
)
created_lineage = metadata.add_lineage(data=add_lineage_request)

# Automated SQL lineage (parses SQL to extract lineage)
from metadata.generated.schema.entity.services.databaseService import DatabaseService

database_service = metadata.get_by_name(entity=DatabaseService, fqn="my_service")
metadata.add_lineage_by_query(
    database_service=database_service,
    sql="INSERT INTO target_table(id) SELECT id FROM source_table",
    timeout=200,
)
{
  "entity": {
    "id": "e7bee99b-5c5e-43ec-805c-8beba04804f5",
    "type": "table",
    "name": "source_table",
    "fullyQualifiedName": "sample_data.ecommerce_db.shopify.source_table",
    "deleted": false,
    "href": "http://localhost:8585/api/v1/tables/e7bee99b-5c5e-43ec-805c-8beba04804f5"
  },
  "nodes": [
    {
      "id": "800caa0f-a149-48d2-a0ce-6ca84501767e",
      "type": "table",
      "name": "target_table",
      "fullyQualifiedName": "sample_data.ecommerce_db.shopify.target_table",
      "deleted": false,
      "href": "http://localhost:8585/api/v1/tables/800caa0f-a149-48d2-a0ce-6ca84501767e"
    }
  ],
  "upstreamEdges": [],
  "downstreamEdges": [
    {
      "fromEntity": "e7bee99b-5c5e-43ec-805c-8beba04804f5",
      "toEntity": "800caa0f-a149-48d2-a0ce-6ca84501767e",
      "lineageDetails": {
        "sqlQuery": "INSERT INTO target SELECT id, name FROM source",
        "columnsLineage": [
          {
            "fromColumns": [
              "sample_data.ecommerce_db.shopify.source_table.id"
            ],
            "toColumn": "sample_data.ecommerce_db.shopify.target_table.id"
          }
        ]
      }
    }
  ]
}

Returns

Returns the lineage graph for the fromEntity node, including the newly created edge.

Response

entity
object
The entity reference for the source node of the created lineage edge.
nodes
array
List of entity references for all connected nodes.
upstreamEdges
array
Edges pointing into the source entity.
downstreamEdges
array
Edges pointing away from the source entity, including the newly created edge.

Error Handling

CodeError TypeDescription
400BAD_REQUESTInvalid request body or missing required fields
401UNAUTHORIZEDInvalid or missing authentication token
403FORBIDDENUser lacks permission to create lineage
404NOT_FOUNDOne of the referenced entities does not exist