Skip to main content
POST
https://sandbox.getcollate.io/api
/
v1
/
pipelines
POST /v1/pipelines
from metadata.sdk import configure
from metadata.sdk.entities import Pipelines
from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest

configure(
    host="https://your-company.getcollate.io/api",
    jwt_token="your-jwt-token"
)

request = CreatePipelineRequest(
    name="dbt_analytics_customers",
    displayName="DBT Customer Analytics",
    service="sample_airflow",
    description="Analytics pipeline for customer data processing",
    sourceUrl="http://localhost:8080/tree?dag_id=dbt_analytics_customers",
    tasks=[
        {
            "name": "extract_customers",
            "taskType": "BatchTask",
            "description": "Extract customer data from source",
            "downstreamTasks": ["transform_customers"]
        },
        {
            "name": "transform_customers",
            "taskType": "BatchTask",
            "description": "Transform and clean customer data",
            "downstreamTasks": ["load_customers"]
        },
        {
            "name": "load_customers",
            "taskType": "BatchTask",
            "description": "Load transformed data into warehouse"
        }
    ]
)

pipeline = Pipelines.create(request)
print(f"Created: {pipeline.fullyQualifiedName}")
{
  "id": "538faa63-d204-46ff-aead-d158d0401cac",
  "name": "dbt_analytics_customers",
  "displayName": "DBT Customer Analytics",
  "fullyQualifiedName": "sample_airflow.dbt_analytics_customers",
  "description": "Analytics pipeline for customer data processing",
  "version": 0.1,
  "updatedAt": 1769982668397,
  "updatedBy": "admin",
  "sourceUrl": "http://localhost:8080/tree?dag_id=dbt_analytics_customers",
  "service": {
    "id": "daa58a49-df05-48a3-a417-45dfd12eacf5",
    "type": "pipelineService",
    "name": "sample_airflow",
    "fullyQualifiedName": "sample_airflow",
    "deleted": false
  },
  "serviceType": "Airflow",
  "tasks": [
    {
      "name": "extract_customers",
      "taskType": "BatchTask",
      "description": "Extract customer data from source",
      "downstreamTasks": ["transform_customers"]
    },
    {
      "name": "transform_customers",
      "taskType": "BatchTask",
      "description": "Transform and clean customer data",
      "downstreamTasks": ["load_customers"]
    },
    {
      "name": "load_customers",
      "taskType": "BatchTask",
      "description": "Load transformed data into warehouse"
    }
  ],
  "href": "http://localhost:8585/api/v1/pipelines/538faa63-d204-46ff-aead-d158d0401cac",
  "deleted": false,
  "owners": [],
  "tags": [],
  "followers": [],
  "votes": {
    "upVotes": 0,
    "downVotes": 0
  },
  "domains": []
}

Create a Pipeline

Create a new pipeline within a pipeline service.

Body Parameters

name
string
required
Name of the pipeline. Must be unique within the parent pipeline service.
service
string
required
Fully qualified name of the parent PipelineService (e.g., sample_airflow).
displayName
string
Human-readable display name for the pipeline.
description
string
Description of the pipeline in Markdown format.
sourceUrl
string
URL to the pipeline definition in the source system (e.g., Airflow DAG URL).
tasks
array
Array of pipeline tasks representing individual steps in the pipeline.
owners
array
Array of owner references (users or teams) to assign to the pipeline.
domain
string
Fully qualified name of the domain to assign for governance purposes.
tags
array
Array of classification tags to apply to the pipeline.
extension
object
Custom property values defined by your organization’s metadata schema.
POST /v1/pipelines
from metadata.sdk import configure
from metadata.sdk.entities import Pipelines
from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest

configure(
    host="https://your-company.getcollate.io/api",
    jwt_token="your-jwt-token"
)

request = CreatePipelineRequest(
    name="dbt_analytics_customers",
    displayName="DBT Customer Analytics",
    service="sample_airflow",
    description="Analytics pipeline for customer data processing",
    sourceUrl="http://localhost:8080/tree?dag_id=dbt_analytics_customers",
    tasks=[
        {
            "name": "extract_customers",
            "taskType": "BatchTask",
            "description": "Extract customer data from source",
            "downstreamTasks": ["transform_customers"]
        },
        {
            "name": "transform_customers",
            "taskType": "BatchTask",
            "description": "Transform and clean customer data",
            "downstreamTasks": ["load_customers"]
        },
        {
            "name": "load_customers",
            "taskType": "BatchTask",
            "description": "Load transformed data into warehouse"
        }
    ]
)

pipeline = Pipelines.create(request)
print(f"Created: {pipeline.fullyQualifiedName}")
{
  "id": "538faa63-d204-46ff-aead-d158d0401cac",
  "name": "dbt_analytics_customers",
  "displayName": "DBT Customer Analytics",
  "fullyQualifiedName": "sample_airflow.dbt_analytics_customers",
  "description": "Analytics pipeline for customer data processing",
  "version": 0.1,
  "updatedAt": 1769982668397,
  "updatedBy": "admin",
  "sourceUrl": "http://localhost:8080/tree?dag_id=dbt_analytics_customers",
  "service": {
    "id": "daa58a49-df05-48a3-a417-45dfd12eacf5",
    "type": "pipelineService",
    "name": "sample_airflow",
    "fullyQualifiedName": "sample_airflow",
    "deleted": false
  },
  "serviceType": "Airflow",
  "tasks": [
    {
      "name": "extract_customers",
      "taskType": "BatchTask",
      "description": "Extract customer data from source",
      "downstreamTasks": ["transform_customers"]
    },
    {
      "name": "transform_customers",
      "taskType": "BatchTask",
      "description": "Transform and clean customer data",
      "downstreamTasks": ["load_customers"]
    },
    {
      "name": "load_customers",
      "taskType": "BatchTask",
      "description": "Load transformed data into warehouse"
    }
  ],
  "href": "http://localhost:8585/api/v1/pipelines/538faa63-d204-46ff-aead-d158d0401cac",
  "deleted": false,
  "owners": [],
  "tags": [],
  "followers": [],
  "votes": {
    "upVotes": 0,
    "downVotes": 0
  },
  "domains": []
}

Returns

Returns the created pipeline object with all specified properties and system-generated fields.

Response

id
string
Unique identifier for the pipeline (UUID format).
name
string
Pipeline name.
fullyQualifiedName
string
Fully qualified name in format service.pipelineName.
displayName
string
Human-readable display name.
description
string
Description of the pipeline in Markdown format.
sourceUrl
string
URL to the pipeline in the source system.
service
object
Reference to the parent pipeline service.
serviceType
string
Type of pipeline service (e.g., Airflow, Dagster, DBTCloud).
tasks
array
List of tasks in the pipeline.
owners
array
List of owners assigned to the pipeline.
domains
array
Domain assignments for governance.
tags
array
Classification tags applied to the pipeline.
extension
object
Custom property values defined by your organization’s metadata schema.
version
number
Version number for the entity (starts at 0.1).

Create or Update (PUT)

Use PUT /v1/pipelines instead of POST to perform an upsert. If a pipeline with the same fullyQualifiedName already exists, it will be updated; otherwise, a new pipeline is created. The request body is the same as POST.
curl -X PUT "{base_url}/api/v1/pipelines" \
  -H "Authorization: Bearer {access_token}" \
  -H "Content-Type: application/json" \
  -d '{ ... same body as POST ... }'
PUT will not return a 409 conflict error if the entity already exists — it will update the existing entity instead.

Bulk Create or Update (PUT)

Use PUT /v1/pipelines/bulk to create or update multiple pipelines in a single request. The request body is an array of create request objects.
curl -X PUT "{base_url}/api/v1/pipelines/bulk" \
  -H "Authorization: Bearer {access_token}" \
  -H "Content-Type: application/json" \
  -d '[
    { "name": "pipeline_one", "service": "sample_airflow" },
    { "name": "pipeline_two", "service": "sample_airflow" }
  ]'

Error Handling

CodeError TypeDescription
400BAD_REQUESTInvalid request body or missing required fields
401UNAUTHORIZEDInvalid or missing authentication token
403FORBIDDENUser lacks permission to create pipelines
409ENTITY_ALREADY_EXISTSPipeline with same name already exists in service (POST only)