> ## Documentation Index
> Fetch the complete documentation index at: https://docs.getcollate.io/llms.txt
> Use this file to discover all available pages before exploring further.

# OpenLineage | Collate Data Lineage Pipeline

> Connect your data pipelines with Collate's OpenLineage connector. Track data lineage, monitor pipeline metadata, and gain end-to-end visibility.

export const MetadataIngestionUi = ({connector, selectServicePath, addNewServicePath, serviceConnectionPath}) => {
  return <>
   <p>
      To ingest metadata from your sources, you need to create a service connection.
      The service connects your source system with Collate. Once you create
      a service, you can use it to configure your ingestion workflows.<br />
      <br />
      To create a service connection and ingest your metadata, follow the steps below:
  </p>
  <Steps>
    <Step title="Select the Service">
    <ol>
          <li>
            On the left navigation bar, click <strong>Settings</strong>.
          </li>
          <li>
            On the next page, click <strong>Services</strong>, and then select the service.
            <img src="/public/images/connectors/visit-services-page.png" alt="Visit Services Page" />
          </li>
    </ol>
   </Step>


   <Step title="Create a New Service">
      
       To add a new service connection, click <strong>Add New Service</strong>.
      <img src="/public/images/connectors/create-new-service.png" alt="Create a new Service" />


   </Step>


     <Step title="Select the Connector">
       Select <strong>{connector}</strong> as the service type and click <strong>Next</strong>.


       {selectServicePath && <img src={selectServicePath} alt="Select Service" />}
   </Step>


   <Step title="Name and Describe your Service">
       Enter a unique, descriptive <strong>Service Name</strong> and <strong>Description</strong>.
       <ul>
         <li><strong>Service Name</strong>: Collate identifies services by their service name. Enter a name that distinguishes this deployment from other services, including other {connector} services you are ingesting metadata from.</li>
       </ul>


       <Note>
           The service name cannot be changed after it is set.
       </Note>


       {addNewServicePath && <img src={addNewServicePath} alt="Add New Service" />}
   </Step>


   <Step title="Configure the Service Connection">
       Set up the connection settings required for {connector}. <br /><br />
      
       Configure the following connection options to set up the service and start ingesting metadata from your sources. The right-hand panel displays help documentation for the selected connection type in the product UI

       {serviceConnectionPath && <img src={serviceConnectionPath} alt="Configure Service connection" />}
   </Step>
   </Steps>
   </>;
};

export const ConnectorDetailsHeader = ({name, icon, stage, availableFeatures, unavailableFeatures = [], availableFeaturesCollate = []}) => {
  const showSubHeading = availableFeatures?.length > 0 || unavailableFeatures?.length > 0 || availableFeaturesCollate?.length > 0;
  const totalAvailableFeatures = [...availableFeatures || [], ...availableFeaturesCollate || []];
  return <div className="container">
      <div className="Heading">
        <div className="flex items-center gap-3">
          {icon && <div className="IconContainer">
              <img src={icon} alt={name} noZoom className="ConnectorIcon" />
            </div>}
          <h1 className="ConnectorName">{name}</h1>
          <span className={`StageBadge ${stage === 'PROD' ? 'prod' : 'beta'}`}>
            {stage}
          </span>
        </div>
      </div>
      {showSubHeading && <div className="SubHeading">
          <div className="FeaturesHeading">Feature List</div>
          <div className="FeaturesList">
            {totalAvailableFeatures.map(feature => <div className="FeatureTag AvailableFeature" key={feature}>
                ✓ {feature}
              </div>)}
            {unavailableFeatures.map(feature => <div className="FeatureTag UnavailableFeature" key={feature}>
                ✕ {feature}
              </div>)}
          </div>
        </div>}
    </div>;
};

<ConnectorDetailsHeader icon="/public/images/connectors/openlineage.png" name="OpenLineage" stage="PROD" availableFeatures={["Pipelines", "Lineage", "Usage"]} unavailableFeatures={["Pipeline Status", "Owners", "Tags"]} />

In this section, we provide guides and references to use the OpenLineage connector.
Configure and schedule OpenLineage metadata workflows from the Collate UI:

* [Requirements](#requirements)
* [Metadata Ingestion](#metadata-ingestion)
* [Troubleshooting](/connectors/pipeline/openlineage/troubleshooting)

## Requirements

Collate is integrated with OpenLineage up to version 1.7.0 and will continue to work for future OpenLineage versions.
OpenLineage is an open framework for data lineage collection and analysis. At its core is an extensible specification that systems can use to interoperate with lineage metadata.
Apart from being a specification, it is also a set of integrations collecting lineage from various systems such as Apache Airflow and Spark.

### OpenLineage connector events

OpenLineage connector consumes OpenLineage events from either a **Kafka broker** or **AWS Kinesis Data Streams** and translates them to Collate lineage information.

### Kafka configuration

#### Airflow OpenLineage events (Kafka)

To configure your Airflow instance:

1. Install the appropriate provider in [Airflow](https://airflow.apache.org/docs/apache-airflow-providers-openlineage/stable/index.html)
2. Configure the OpenLineage provider in [Airflow](https://airflow.apache.org/docs/apache-airflow-providers-openlineage/stable/guides/user.html#using-openlineage-integration)
   * Use `kafka` transport mode because OpenLineage events are collected from the Kafka topic. Detailed configuration options are available [here](https://openlineage.io/docs/client/python/#configuration).

#### Spark OpenLineage events (Kafka)

Configure your Spark session to produce OpenLineage events compatible with the Collate connector:

```python theme={null}
from pyspark.sql import SparkSession
from uuid import uuid4

spark = SparkSession.builder\
    .config('spark.openlineage.namespace', 'mynamespace')\
    .config('spark.openlineage.parentJobName', 'hello-world')\
    .config('spark.openlineage.parentRunId', str(uuid4()))\
    .config('spark.jars.packages', 'io.openlineage:openlineage-spark:1.7.0')\
    .config('spark.extraListeners', 'io.openlineage.spark.agent.OpenLineageSparkListener')\
    .config('spark.openlineage.transport.type', 'kafka')\
    .getOrCreate()
```

### AWS Kinesis configuration

The OpenLineage connector also supports consuming events from AWS Kinesis Data Streams. This is useful when your data pipelines publish OpenLineage events to Kinesis instead of Kafka.

#### Kinesis requirements

* An AWS Kinesis Data Stream receiving OpenLineage events
* AWS credentials with permissions to read from the Kinesis stream:
  * `kinesis:GetRecords`
  * `kinesis:GetShardIterator`
  * `kinesis:DescribeStream`
  * `kinesis:ListShards`

#### Spark OpenLineage events (Kinesis)

Configure your Spark session to produce OpenLineage events to Kinesis:

```python theme={null}
from pyspark.sql import SparkSession
from uuid import uuid4

spark = SparkSession.builder\
    .config('spark.openlineage.namespace', 'mynamespace')\
    .config('spark.openlineage.parentJobName', 'hello-world')\
    .config('spark.openlineage.parentRunId', str(uuid4()))\
    .config('spark.jars.packages', 'io.openlineage:openlineage-spark:1.7.0')\
    .config('spark.extraListeners', 'io.openlineage.spark.agent.OpenLineageSparkListener')\
    .config('spark.openlineage.transport.type', 'kinesis')\
    .config('spark.openlineage.transport.streamName', 'openlineage-events')\
    .config('spark.openlineage.transport.region', 'us-east-2')\
    .getOrCreate()
```

## Metadata Ingestion

<MetadataIngestionUi connector={"OpenLineage"} selectServicePath={"/public/images/connectors/openlineage/select-service.png"} addNewServicePath={"/public/images/connectors/openlineage/add-new-service.png"} serviceConnectionPath={"/public/images/connectors/openlineage/service-connection.png"} />

## Connection Details

<Steps>
  <Step title="Test the Connection">
    Once the credentials have been added, click on *Test Connection* and *Save* the changes.

    <img src="https://mintcdn.com/collatedocs/L7psA65ao88vmcRI/public/images/connectors/test-connection.png?fit=max&auto=format&n=L7psA65ao88vmcRI&q=85&s=2133f0d65f18df1e57f69d2cc3bdeff4" alt="Test Connection" width="1494" height="310" data-path="public/images/connectors/test-connection.png" />
  </Step>

  <Step title="Configure Metadata Ingestion">
    In this step we will configure the metadata ingestion pipeline,
    Please follow the instructions below

    <img src="https://mintcdn.com/collatedocs/12mtkhFtvTP7FUxZ/public/images/connectors/configure-metadata-ingestion-pipeline.png?fit=max&auto=format&n=12mtkhFtvTP7FUxZ&q=85&s=b310e691eaec850bc9eda3ed0b8a340f" alt="Configure Metadata Ingestion" width="1508" height="1614" data-path="public/images/connectors/configure-metadata-ingestion-pipeline.png" />

    #### Metadata Ingestion Options

    * **Name**: This field refers to the name of ingestion pipeline, you can customize the name or use the generated name.
    * **Pipeline Filter Pattern (Optional)**: Use to pipeline filter patterns to control whether or not to include pipeline as part of metadata ingestion.
      * **Include**: Explicitly include pipeline by adding a list of comma-separated regular expressions to the Include field. OpenMetadata will include all pipeline with names matching one or more of the supplied regular expressions. All other schemas will be excluded.
      * **Exclude**: Explicitly exclude pipeline by adding a list of comma-separated regular expressions to the Exclude field. OpenMetadata will exclude all pipeline with names matching one or more of the supplied regular expressions. All other schemas will be included.
    * **Include lineage (toggle)**: Set the Include lineage toggle to control whether to include lineage between pipelines and data sources as part of metadata ingestion.
    * **Enable Debug Log (toggle)**: Set the Enable Debug Log toggle to set the default log level to debug.
    * **Mark Deleted Pipelines (toggle)**: Set the Mark Deleted Pipelines toggle to flag pipelines as soft-deleted if they are not present anymore in the source system.
  </Step>

  <Step title="Schedule the Ingestion and Deploy">
    Scheduling can be set up at an hourly, daily, weekly, or manual cadence. The
    timezone is in UTC. Select a Start Date to schedule for ingestion. It is
    optional to add an End Date.

    Review your configuration settings. If they match what you intended,
    click Deploy to create the service and schedule metadata ingestion.

    If something doesn't look right, click the Back button to return to the
    appropriate step and change the settings as needed.

    After configuring the workflow, you can click on Deploy to create the
    pipeline.

    <img src="https://mintcdn.com/collatedocs/piJyXg9wW6Ik1lg-/public/images/connectors/schedule.png?fit=max&auto=format&n=piJyXg9wW6Ik1lg-&q=85&s=f1add591824b44456f0e2ff259a21c6f" alt="Schedule the Workflow" width="2733" height="1083" data-path="public/images/connectors/schedule.png" />
  </Step>

  <Step title="View the Ingestion Pipeline">
    Once the workflow has been successfully deployed, you can view the
    Ingestion Pipeline running from the Service Page.

    <img src="https://mintcdn.com/collatedocs/cOe_QuHYxAbkMtTI/public/images/connectors/view-ingestion-pipeline.png?fit=max&auto=format&n=cOe_QuHYxAbkMtTI&q=85&s=8c754af74f99ee70e714f6f707b827e4" alt="View Ingestion Pipeline" width="2733" height="1271" data-path="public/images/connectors/view-ingestion-pipeline.png" />

    <Tip>
      If [AutoPilot](/how-to-guides/admin-guide/applications/autopilot) is enabled, workflows like usage tracking, data lineage, and similar tasks will be handled automatically. Users don’t need to set up or manage them - AutoPilot takes care of everything in the system.
    </Tip>
  </Step>
</Steps>

### Providing connection details programmatically via API

#### 1. Preparing the Client

```python theme={null}
from metadata.sdk import configure

configure(host="http://localhost:8585/api", jwt_token="<token>")
```

#### 2. Creating the OpenLineage pipeline service (Kafka)

```python theme={null}
from metadata.sdk import client
from metadata.generated.schema.api.services.createPipelineService import CreatePipelineServiceRequest
from metadata.generated.schema.entity.services.pipelineService import (
    PipelineServiceType,
    PipelineConnection,
)
from metadata.generated.schema.entity.services.connections.pipeline.openLineageConnection import (
    OpenLineageConnection,
    KafkaBrokerConfig,
    SecurityProtocol as KafkaSecurityProtocol,
    ConsumerOffsets
)
from metadata.generated.schema.security.ssl.validateSSLClientConfig import (
    ValidateSslClientConfig,
)

openlineage_service_request = CreatePipelineServiceRequest(
    name='openlineage-kafka-service',
    displayName='OpenLineage Kafka Service',
    serviceType=PipelineServiceType.OpenLineage,
    connection=PipelineConnection(
        config=OpenLineageConnection(
            brokerConfig=KafkaBrokerConfig(
                brokersUrl='broker1:9092,broker2:9092',
                topicName='openlineage-events',
                consumerGroupName='openmetadata-consumer',
                consumerOffsets=ConsumerOffsets.earliest,
                poolTimeout=3.0,
                sessionTimeout=60,
                securityProtocol=KafkaSecurityProtocol.SSL,
                sslConfig=ValidateSslClientConfig(
                    sslCertificate='/path/to/kafka/certs/Certificate.pem',
                    sslKey='/path/to/kafka/certs/Key.pem',
                    caCertificate='/path/to/kafka/certs/RootCA.pem'
                )
            )
        )
    ),
)
client().ometa.create_or_update(openlineage_service_request)
```

#### 3. Creating the OpenLineage pipeline service (Kinesis)

```python theme={null}
from metadata.sdk import client
from metadata.generated.schema.api.services.createPipelineService import CreatePipelineServiceRequest
from metadata.generated.schema.entity.services.pipelineService import (
    PipelineServiceType,
    PipelineConnection,
)
from metadata.generated.schema.entity.services.connections.pipeline.openLineageConnection import (
    OpenLineageConnection,
    KinesisBrokerConfig,
    KinesisConsumerOffsets
)
from metadata.generated.schema.security.credentials.awsCredentials import AWSCredentials

openlineage_service_request = CreatePipelineServiceRequest(
    name='openlineage-kinesis-service',
    displayName='OpenLineage Kinesis Service',
    serviceType=PipelineServiceType.OpenLineage,
    connection=PipelineConnection(
        config=OpenLineageConnection(
            brokerConfig=KinesisBrokerConfig(
                streamName='openlineage-events',
                consumerOffsets=KinesisConsumerOffsets.TRIM_HORIZON,
                poolTimeout=1.0,
                sessionTimeout=30,
                awsConfig=AWSCredentials(
                    awsRegion='us-east-2',
                    awsAccessKeyId='<your-access-key>',
                    awsSecretAccessKey='<your-secret-key>',
                    # awsSessionToken='<session-token>',
                )
            )
        )
    ),
)
client().ometa.create_or_update(openlineage_service_request)
```

## Troubleshooting

<Columns cols={2}>
  <Card title="OpenLineage Troubleshooting" href="/connectors/pipeline/openlineage/troubleshooting">
    Learn more about how to troubleshoot common OpenLineage connector issues and resolve configuration or ingestion errors.
  </Card>
</Columns>
