Run the ingestion from GCP Composer
Requirements
This approach has been last tested against:- Composer version 2.5.4
- Airflow version 2.6.3
openmetadata-ingestion==1.3.1.0.
Using the Python Operator
The most comfortable way to run the metadata workflows from GCP Composer is directly via aPythonOperator. Note that
it will require you to install the packages and plugins directly on the host.
Install the Requirements
In your environment you will need to install the following packages:openmetadata-ingestion[<plugins>]==x.y.z.sqlalchemy==1.4.27: This is needed to align OpenMetadata version with the Composer internal requirements.
x.y.z is the version of the OpenMetadata ingestion package. Note that the version needs to match the server version. If we are using the server at 1.1.0, then the ingestion package needs to also be 1.1.0.
The plugin parameter is a list of the sources that we want to ingest. An example would look like this openmetadata-ingestion[mysql,snowflake,s3]==1.1.0.
Prepare the DAG!
Note that this DAG is a usual connector DAG, just using the Airflow service with theBackend connection.
As an example of a DAG pushing data to OpenMetadata under Google SSO, we could have:
Ingestion Workflow classes
We have different classes for different types of workflows. The logic is always the same, but you will need to change your import path. The rest of the method calls will remain the same. For example, for theMetadata workflow we’ll use:
Metadata:from metadata.workflow.metadata import MetadataWorkflowLineage:from metadata.workflow.metadata import MetadataWorkflow(same as metadata)Usage:from metadata.workflow.usage import UsageWorkflowdbt:from metadata.workflow.metadata import MetadataWorkflowProfiler:from metadata.workflow.profiler import ProfilerWorkflowData Quality:from metadata.workflow.data_quality import TestSuiteWorkflowData Insights:from metadata.workflow.data_insight import DataInsightWorkflowElasticsearch Reindex:from metadata.workflow.metadata import MetadataWorkflow(same as metadata)
Using the Kubernetes Pod Operator
In this second approach we won’t need to install absolutely anything to the GCP Composer environment. Instead, we will rely on theKubernetesPodOperator to use the underlying k8s cluster of Composer.
Then, the code won’t directly run using the hosts’ environment, but rather inside a container that we created
with only the openmetadata-ingestion package.
Note: This approach only has the openmetadata/ingestion-base ready from version 0.12.1 or higher!
Prepare the DAG!
Kubernetes Pod Operator
You can name the task as you want (task_id and name). The important points here are the cmds, this should not
be changed, and the env_vars. The main.py script that gets shipped within the image will load the env vars
as they are shown, so only modify the content of the config YAML, but not this dictionary.
Note that the example uses the image openmetadata/ingestion-base:0.13.2. Update that accordingly for higher version
once they are released. Also, the image version should be aligned with your OpenMetadata server version to avoid
incompatibilities.
KubernetesPodOperator and how to tune its configurations
here.
Note that depending on the kind of workflow you will be deploying, the YAML configuration will need to updated following
the official OpenMetadata docs, and the value of the pipelineType configuration will need to hold one of the following values:
metadatausagelineageprofilerTestSuite
PipelineType JSON Schema definitions