"""
dbt + Collate Integration DAG (GCS Method)
This DAG:
1. Runs dbt models
2. Runs dbt tests
3. Generates dbt documentation (catalog.json)
4. Uploads all artifacts to Google Cloud Storage
Perfect for Cloud Composer or GKE deployments.
"""
import os
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.utils.task_group import TaskGroup
# =============================================================================
# CONFIGURATION
# =============================================================================
# dbt Configuration
DBT_PROJECT_DIR = os.getenv("DBT_PROJECT_DIR", "/home/airflow/gcs/dbt/my_project")
DBT_PROFILES_DIR = os.getenv("DBT_PROFILES_DIR", "/home/airflow/gcs/dbt")
# GCS Configuration
GCS_BUCKET = os.getenv("GCS_BUCKET", "your-company-dbt-artifacts")
GCS_PREFIX = os.getenv("GCS_PREFIX", "dbt")
GCP_PROJECT = os.getenv("GCP_PROJECT", "your-gcp-project")
# Service Account (if not using Workload Identity)
GOOGLE_APPLICATION_CREDENTIALS = os.getenv(
"GOOGLE_APPLICATION_CREDENTIALS",
"/home/airflow/gcs/dbt-sa-key.json"
)
# =============================================================================
# DAG DEFAULT ARGUMENTS
# =============================================================================
default_args = {
"owner": "data-engineering",
"depends_on_past": False,
"email": ["data-team@yourcompany.com"],
"email_on_failure": True,
"email_on_retry": False,
"retries": 2,
"retry_delay": timedelta(minutes=5),
"execution_timeout": timedelta(hours=2),
}
# =============================================================================
# PYTHON FUNCTIONS
# =============================================================================
def upload_artifacts_to_gcs(**context):
"""
Upload dbt artifacts to Google Cloud Storage.
Uses google-cloud-storage library (pre-installed in Cloud Composer).
For self-hosted: pip install google-cloud-storage
"""
from google.cloud import storage
# Initialize GCS client
if os.path.exists(GOOGLE_APPLICATION_CREDENTIALS):
client = storage.Client.from_service_account_json(
GOOGLE_APPLICATION_CREDENTIALS
)
else:
# Use default credentials (Workload Identity or ADC)
client = storage.Client(project=GCP_PROJECT)
bucket = client.bucket(GCS_BUCKET)
target_dir = os.path.join(DBT_PROJECT_DIR, "target")
# Files to upload
artifacts = [
("manifest.json", True), # Required
("catalog.json", False), # Optional but recommended
("run_results.json", False), # Optional
("sources.json", False), # Optional
]
uploaded = []
failed = []
for filename, required in artifacts:
local_path = os.path.join(target_dir, filename)
gcs_path = f"{GCS_PREFIX}/{filename}"
if os.path.exists(local_path):
try:
blob = bucket.blob(gcs_path)
blob.upload_from_filename(local_path)
uploaded.append(filename)
print(f"✓ Uploaded {filename} to gs://{GCS_BUCKET}/{gcs_path}")
except Exception as e:
error_msg = f"✗ Failed to upload {filename}: {e}"
print(error_msg)
if required:
raise Exception(error_msg)
failed.append(filename)
else:
if required:
raise FileNotFoundError(
f"Required artifact not found: {local_path}\n"
f"Make sure 'dbt run' completed successfully."
)
else:
print(f"⊘ Skipping {filename} (not found - optional)")
# Log summary
print(f"\n{'='*50}")
print(f"Upload Summary:")
print(f" Uploaded: {', '.join(uploaded) or 'None'}")
print(f" Skipped: {', '.join(failed) or 'None'}")
print(f" GCS Location: gs://{GCS_BUCKET}/{GCS_PREFIX}/")
print(f"{'='*50}")
return {"uploaded": uploaded, "bucket": GCS_BUCKET, "prefix": GCS_PREFIX}
# =============================================================================
# DAG DEFINITION
# =============================================================================
with DAG(
dag_id="dbt_with_gcs",
default_args=default_args,
description="Run dbt models and sync metadata to Collate via GCS",
schedule_interval="0 6 * * *", # Daily at 6 AM UTC
start_date=datetime(2024, 1, 1),
catchup=False,
max_active_runs=1,
tags=["dbt", "collate", "gcs", "data-pipeline"],
) as dag:
# Task Group: dbt Execution
with TaskGroup(group_id="dbt_execution") as dbt_tasks:
dbt_run = BashOperator(
task_id="dbt_run",
bash_command=f"""
cd {DBT_PROJECT_DIR} && \
dbt run --profiles-dir {DBT_PROFILES_DIR}
""",
)
dbt_test = BashOperator(
task_id="dbt_test",
bash_command=f"""
cd {DBT_PROJECT_DIR} && \
dbt test --profiles-dir {DBT_PROFILES_DIR}
""",
trigger_rule="all_done",
)
dbt_docs = BashOperator(
task_id="dbt_docs_generate",
bash_command=f"""
cd {DBT_PROJECT_DIR} && \
dbt docs generate --profiles-dir {DBT_PROFILES_DIR}
""",
)
dbt_run >> dbt_test >> dbt_docs
# Upload to GCS
upload_to_gcs = PythonOperator(
task_id="upload_artifacts_to_gcs",
python_callable=upload_artifacts_to_gcs,
provide_context=True,
)
# DAG Dependencies
dbt_tasks >> upload_to_gcs