Skip to main content

Pipelines

The Pipelines class provides CRUD operations for data pipeline metadata (Airflow, dbt, Fivetran, etc.). FQN format: service_name.pipeline_name

Import

from metadata.sdk import configure, Pipelines
from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest

configure(host="https://your-instance.getcollate.io/api", jwt_token="your-token")

Create

pipeline = Pipelines.create(
    CreatePipelineRequest(
        name="daily_customer_sync",
        service="my_airflow",
        description="Syncs customer records daily from CRM to warehouse",
        sourceUrl="https://airflow.example.com/dags/daily_customer_sync",
    )
)
print(pipeline.fullyQualifiedName)  # "my_airflow.daily_customer_sync"

Retrieve

# By ID
pipeline = Pipelines.retrieve("pipeline-uuid")

# By FQN
pipeline = Pipelines.retrieve_by_name("my_airflow.daily_customer_sync")

# With fields
pipeline = Pipelines.retrieve_by_name(
    "my_airflow.daily_customer_sync",
    fields=["owners", "tags", "tasks"],
)

List

# Single page
page = Pipelines.list(limit=25)

# All pipelines
all_pipelines = Pipelines.list_all()

# Filter by service
pipelines = Pipelines.list_all(filters={"service": "my_airflow"})

Update

pipeline = Pipelines.retrieve_by_name("my_airflow.daily_customer_sync")
pipeline.description = "Updated pipeline description"
Pipelines.update(pipeline)

Delete

Pipelines.delete("pipeline-uuid")
Pipelines.delete("pipeline-uuid", hard_delete=True)
results = Pipelines.search("customer_sync", size=10)
for pipeline in results:
    print(pipeline.fullyQualifiedName)

Version History

versions = Pipelines.get_versions("pipeline-uuid")
v = Pipelines.get_specific_version("pipeline-uuid", "0.2")