Pipelines
ThePipelines class provides CRUD operations for data pipeline metadata (Airflow, dbt, Fivetran, etc.).
FQN format: service_name.pipeline_name
Import
Copy
Ask AI
from metadata.sdk import configure, Pipelines
from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest
configure(host="https://your-instance.getcollate.io/api", jwt_token="your-token")
Create
Copy
Ask AI
pipeline = Pipelines.create(
CreatePipelineRequest(
name="daily_customer_sync",
service="my_airflow",
description="Syncs customer records daily from CRM to warehouse",
sourceUrl="https://airflow.example.com/dags/daily_customer_sync",
)
)
print(pipeline.fullyQualifiedName) # "my_airflow.daily_customer_sync"
Retrieve
Copy
Ask AI
# By ID
pipeline = Pipelines.retrieve("pipeline-uuid")
# By FQN
pipeline = Pipelines.retrieve_by_name("my_airflow.daily_customer_sync")
# With fields
pipeline = Pipelines.retrieve_by_name(
"my_airflow.daily_customer_sync",
fields=["owners", "tags", "tasks"],
)
List
Copy
Ask AI
# Single page
page = Pipelines.list(limit=25)
# All pipelines
all_pipelines = Pipelines.list_all()
# Filter by service
pipelines = Pipelines.list_all(filters={"service": "my_airflow"})
Update
Copy
Ask AI
pipeline = Pipelines.retrieve_by_name("my_airflow.daily_customer_sync")
pipeline.description = "Updated pipeline description"
Pipelines.update(pipeline)
Delete
Copy
Ask AI
Pipelines.delete("pipeline-uuid")
Pipelines.delete("pipeline-uuid", hard_delete=True)
Search
Copy
Ask AI
results = Pipelines.search("customer_sync", size=10)
for pipeline in results:
print(pipeline.fullyQualifiedName)
Version History
Copy
Ask AI
versions = Pipelines.get_versions("pipeline-uuid")
v = Pipelines.get_specific_version("pipeline-uuid", "0.2")